In this section, we will dive into the role of producers in Apache Kafka. Producers are responsible for publishing data records to Kafka topics. They send messages to Kafka brokers, which store and replicate the data for consumption by consumers. Understanding how to create and configure producers is essential for effectively producing data to Kafka topics.

Topics covered in this section:

  1. Configuring and creating a Kafka producer.
  2. Serializing and sending messages to Kafka topics.
  3. Understanding message keys and values.
  4. Configuring producer acknowledgments and retries.
  5. Handling producer errors and failures.

Code Sample: Creating a Kafka Producer (Java)

Java<span role="button" tabindex="0" data-code="import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Configure Kafka producer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Create Kafka producer Producer<string, String> producer = new KafkaProducer<>(props); // Produce a sample record ProducerRecord<string, String> record = new ProducerRecord
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {

    public static void main(String[] args) {
        // Configure Kafka producer
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Create Kafka producer
        Producer<String, String> producer = new KafkaProducer<>(props);

        // Produce a sample record
        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "my_key", "Hello, Kafka!");
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent successfully. Offset: " + metadata.offset());
            } else {
                System.out.println("Failed to send message: " + exception.getMessage());
            }
        });

        // Close the producer
        producer.close();
    }
}

Reference Link:

  • Apache Kafka documentation on producers: link

Helpful Video:

  • “Kafka Producers Explained” by Confluent: link

In this section, we will explore the role of consumers in Apache Kafka. Consumers read data from Kafka topics and process it according to their application logic. Understanding how to create and configure consumers is vital for effectively consuming and processing data from Kafka topics.

Topics covered in this section:

  1. Configuring and creating a Kafka consumer.
  2. Subscribing to Kafka topics and partitions.
  3. Polling for records and processing them.
  4. Configuring consumer offsets and commit strategies.
  5. Handling consumer rebalancing and failures.

Code Sample: Creating a Kafka Consumer (Java)

Java<span role="button" tabindex="0" data-code="import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Configure Kafka consumer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "my_consumer_group"); // Create Kafka consumer Consumer<string, String> consumer = new KafkaConsumer<>(props); // Subscribe to a topic consumer.subscribe(Collections.singleton("my_topic")); // Consume records while (true) { ConsumerRecords<string, String> records = consumer.poll(1000); for (ConsumerRecord
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        // Configure Kafka consumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my_consumer_group");

        // Create Kafka consumer
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // Subscribe to a topic
        consumer.subscribe(Collections.singleton("my_topic"));

        // Consume records
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value() + ", Offset: " +

 record.offset());
            }
        }
    }
}

Reference Link:

  • Apache Kafka documentation on consumers: link

Helpful Video:

  • “Kafka Consumers Explained” by Confluent: link

Conclusion:
In this module, we explored the concepts of producers and consumers in Apache Kafka. Producers are responsible for publishing data records to Kafka topics, while consumers read and process data from Kafka topics. We learned how to create and configure Kafka producers to send messages, as well as how to create and configure Kafka consumers to receive and process messages.

By understanding the functionality and configuration options of Kafka producers and consumers, you have gained the necessary knowledge to effectively produce and consume data in Kafka. With this understanding, you are well-equipped to build real-time streaming applications using Apache Kafka and leverage its scalability, fault-tolerance, and high-performance characteristics.