Introduction

In today’s digital era, real-time data processing has become a necessity. Apache Kafka, a highly scalable, distributed, and fault-tolerant real-time event streaming platform, has emerged as a key player in this space. It’s designed to handle trillions of events in a day and provides a solution to multiple real-time data streaming challenges.

Whether you’re a novice starting your journey or an experienced programmer wanting to refresh your knowledge, this blog post will introduce you to Apache Kafka, its architecture, core concepts, and illustrate how to perform basic operations using Kafka’s API.

Part 1: What is Apache Kafka?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

1. Downloading and Setting Up Kafka

Firstly, you’ll need to download and setup Kafka. You can download Kafka from the Apache website and unzip it:

Bash
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0

2. Starting Kafka Environment

Kafka uses ZooKeeper, so before starting Kafka, let’s start a ZooKeeper server:

Bash
bin/zookeeper-server-start.sh config/zookeeper.properties

Now, we can start the Kafka server:

Bash
bin/kafka-server-start.sh config/server.properties

3. Creating a Topic

Now that Kafka is up and running, let’s create a topic named “test” with a single partition and only one replica:

Bash
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Part 2: Kafka Core Concepts

Kafka revolves around the concepts of topics, partitions, producers, consumers, and brokers.

4. Producers and Consumers

Producers create new messages. In Kafka, the command-line to send a few messages looks like this:

Bash
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

After executing the above command, type a few messages into the console to send to the server.

Consumers read messages. The command-line Kafka consumer will read the messages you just sent:

Bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5. Kafka Topics and Partitions

Topics are feeds of messages in categories that Kafka maintains. We’ve already created a “test” topic in step 3.

Partitioning allows you to split data of a Kafka topic across multiple brokers — allowing you to increase your data redundancy and throughput. You can specify the number of partitions you want for a particular topic. For example, to create a topic partitioned_topic with 5 partitions:

Bash
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic partitioned_topic

6. Kafka Brokers and Clusters

A Kafka broker is a stateless service that receives messages from producers (writers) and forwards them to consumers (readers). A Kafka cluster is a set of Kafka brokers forming a distributed system.

To start multiple brokers, we need to make a unique server.properties file for each broker. For instance, to start a second broker:

Bash
cp config/server.properties config/server-2.properties

Then, we edit this new file config/server-2.properties, change the following:

Bash
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-2

Finally, we can start the second broker:

Bash
bin/kafka-server-start.sh config/server-2.properties

Part 3: Kafka APIs

Apache Kafka has four core APIs: Producer API, Consumer API, Streams API, and Connect API.

7. Using Kafka Producer API

Kafka Producer API allows an application to send a stream of records to one or more Kafka topics.

In Java, you can create a new producer by creating an instance of KafkaProducer, and send data by invoking the send() method. Here’s a simple example:

Java<span role="button" tabindex="0" data-code="Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<string, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

8. Using Kafka Consumer API

Kafka Consumer API allows an application to read a stream of records from one or more Kafka topics.

In Java, you can create a new consumer by creating an instance of KafkaConsumer, and receive data by invoking the poll() method.

Here’s a simple example:

Java<span role="button" tabindex="0" data-code="Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<string, String> records = consumer.poll(100); for (ConsumerRecord
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

9. Using Kafka Streams API

Kafka Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics.

A simple example of a Kafka Streams application:

Java<span role="button" tabindex="0" data-code="Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<string, String> textLines = builder.stream("TextLinesTopic"); KTable
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .count();

wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

10. Using Kafka Connect API

Kafka Connect API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems.

For instance, a simple stand-alone configuration for a file source connector looks like:

JSON
{
    "name": "local-file-source",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "tasks.max": "1",
        "file": "/path/to/file.txt",
        "topic": "connect-test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }
}

And a simple stand-alone configuration for a file sink connector:

JSON
{
    "name": "local-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "file": "/path/to/file.txt",
        "topics": "connect-test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }
}

Conclusion

Apache Kafka is a powerful distributed event streaming platform capable of handling trillions of events a day. It’s not only a buzzword in the Big Data world but an essential component of the modern data architecture.

We have gone through a quick introduction to Apache Kafka, its core concepts, architecture, and Kafka’s four core APIs. We hope that the examples given in each section have illustrated how Kafka works and provided a solid foundation for your future exploration of Kafka.

Remember, mastering Kafka is not a one-day journey; it’s about continuous learning and practice. As Kafka continues to evolve, so does its capabilities and possibilities. Keep exploring and experimenting, and you’ll find Kafka’s power in managing real-time data streams.