Introduction

Apache Kafka is a powerful distributed streaming platform known for its high-throughput, low-latency, fault-tolerant, and durable real-time data processing capabilities. It’s often the backbone of choice for many data integration, event-driven architectures, and real-time analytics solutions. In this tutorial, we aim to take a deep dive into building reliable Kafka producers and consumers using Java, complete with unit tests to ensure our Kafka applications are functioning as expected.

Setting Up the Project

To start, we’ll need to set up a Java project with Maven to manage our dependencies. Below is a sample pom.xml file that includes the dependencies needed for this tutorial:

XML<span role="button" tabindex="0" data-code="<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

The kafka-clients library provides the necessary APIs to interact with Kafka, while junit allows us to write and run our unit tests.

Writing a Reliable Kafka Producer

Let’s begin by creating a Kafka producer. Producers are responsible for publishing data to Kafka topics. We start by defining our producer’s properties:

Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);

These properties specify the Kafka broker’s address, the serializer classes for keys and values (which convert our data into bytes), the acknowledgement settings, and the number of retries in case of failures.

To instantiate the Kafka producer and send a message, we use the following lines of code:

Java<span role="button" tabindex="0" data-code="Producer<string, String> producer = new KafkaProducer<>(props); ProducerRecord<string, String> record = new ProducerRecord
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();

With these simple steps, we’ve published a record to our “test-topic”. The send method is asynchronous and returns a Future<RecordMetadata>, which can be used to handle the response from the broker.

Writing a Reliable Kafka Consumer

Next, we’ll craft a Kafka consumer to subscribe to Kafka topics and consume the published data. As we did with the producer, we start by defining the consumer’s properties:

Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

Here, we define properties such as the Kafka broker’s address, the consumer group ID, the deserializer classes for keys and values, and settings for committing the consumer’s offset.

We then instantiate the Kafka consumer and subscribe to a topic:

Java<span role="button" tabindex="0" data-code="Consumer<string, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<string, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

In this loop, our consumer continuously polls the topic for new records. Upon receiving new records, the consumer processes them and automatically commits the offsets.

Writing Unit Tests for Kafka Producers and Consumers

Testing Kafka producers and consumers can be challenging due to their asynchronous nature and their dependency on a Kafka broker. However, Kafka provides a MockProducer and a MockConsumer that can simulate a real Kafka producer and consumer, making unit testing easier.

Let’s look at how to write a unit test for our Kafka producer:

Java<span role="button" tabindex="0" data-code="@Test public void testKafkaProducer() { Properties props = new Properties(); // Add necessary properties… MockProducer<string, String> mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); ProducerRecord<string, String> record = new ProducerRecord<>("test-topic", "key", "value"); mockProducer.send(record); List<producerRecord
@Test
public void testKafkaProducer() {
    Properties props = new Properties();
    // Add necessary properties...

    MockProducer<String, String> mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
    mockProducer.send(record);

    List<ProducerRecord<String, String>> records = mockProducer.history();
    assertEquals(1, records.size());
    assertEquals("key", records.get(0).key());
    assertEquals("value", records.get(0).value());
}

Similarly, we can write a unit test for our Kafka consumer:

Java<span role="button" tabindex="0" data-code="@Test public void testKafkaConsumer() { Properties props = new Properties(); // Add necessary properties… MockConsumer<string, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); mockConsumer.subscribe(Arrays.asList("test-topic")); mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 0, "key", "value")); ConsumerRecords
@Test
public void testKafkaConsumer() {
    Properties props = new Properties();
    // Add necessary properties...

    MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    mockConsumer.subscribe(Arrays.asList("test-topic"));

    mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 0, "key", "value"));

    ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(100));
    assertEquals(1, records.count());
    assertEquals("key", records.iterator().next().key());
    assertEquals("value", records.iterator().next

().value());
}

In both test cases, we simulate the behavior of real Kafka producers and consumers using the MockProducer and MockConsumer classes.

Conclusion

In this tutorial, we dove deep into building reliable Kafka producers and consumers using Java. We also looked at how to write unit tests for these components, ensuring the reliability and correctness of our Kafka applications.

We’ve only scratched the surface of what you can do with Apache Kafka. The platform also provides advanced features such as exactly-once processing semantics, compacted topics, and stream processing APIs. As you dive deeper into Kafka, you’ll discover that it provides all the tools necessary to build powerful, reliable, and scalable real-time applications.

Remember that the key to mastering Kafka is practice and experimentation. So, don’t be afraid to get your hands dirty and try out different configurations and settings.

Categorized in: