Introduction

Real-time analytics has become a crucial component of modern software architectures, especially when dealing with vast amounts of data generated from various sources. Apache Kafka, a robust and efficient distributed messaging system, provides a library for processing and analyzing this real-time data called Kafka Streams.

Today, we are diving into Kafka Streams, creating a real-time analytics application that will consume a stream of sales data and compute the total sales per product category. Strap in and enjoy the ride.

Part 1: Kafka Streams 101

Before diving into the code, it’s crucial to understand the Kafka Streams architecture and its core concepts. In a nutshell, Kafka Streams allows you to define a “topology” of processing nodes. Each node represents a computational step (e.g., mapping, filtering, aggregation) in your data pipeline. Kafka Streams processes data in real-time, record by record, which enables highly responsive and context-aware applications.

Part 2: Setting Up the Kafka Streams Application

Our real-time analytics application will use Maven for dependency management. Below are the necessary dependencies:

YAML<span role="button" tabindex="0" data-code="<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

After setting up our project, we need to configure our Streams application:

Java
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sales-analytics-app");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

Our configuration includes the application ID, which uniquely identifies the application across the Kafka cluster. We also specify the address of our Kafka broker and default SerDes (Serializer/Deserializer) classes, which determine how to convert between Java objects and bytes for network transmission.

Next, we define our processing topology. The initial stream (“sales-topic”) receives sales data as input. We then transform this stream into a table (“sales-per-category-topic”) that reflects the total sales per product category:

Java<span role="button" tabindex="0" data-code="final StreamsBuilder builder = new StreamsBuilder(); KStream<string, String> sourceStream = builder.stream("sales-topic"); KTable<string, Long> salesPerCategory = sourceStream .map((key, value) -> new KeyValue
final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> sourceStream = builder.stream("sales-topic");
KTable<String, Long> salesPerCategory = sourceStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .reduce(Long::sum);

salesPerCategory.toStream().to("sales-per-category-topic", Produced.with(Serdes.String(), Serdes.Long()));

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

Let’s dissect the steps:

  1. We create a new StreamsBuilder, which provides the high-level Streams DSL.
  2. We specify the source topic of our stream.
  3. We transform the data. Each sales record gets mapped into a KeyValue pair, with the product category as the key and the sale amount as the value.
  4. We group the data by key (i.e., product category) and sum the sales amounts.
  5. We output the result to a new topic.
  6. We create a KafkaStreams instance and start processing.

Part 3: Deep Dive into Data Processing

With the basic setup out of the way, let’s delve deeper into Kafka Streams’ data processing capabilities. Kafka Streams offers two types of data manipulation operations: Stateless and Stateful.

Stateless operations, such as map and filter, process each record independently. In contrast, stateful operations, like groupByKey and reduce, consider the context of previous records.

In our application, we use the map operation to transform the sales records and a combination of groupByKey and reduce to aggregate the sales amounts. Let’s add a filter operation to exclude any sales transactions below a certain threshold:

Java<span role="button" tabindex="0" data-code="KStream<string, String> filteredStream = sourceStream.filter((key, value) -> value.getSaleAmount() > 100); KTable<string, Long> salesPerCategory = filteredStream .map((key, value) -> new KeyValue
KStream<String, String> filteredStream = sourceStream.filter((key, value) -> value.getSaleAmount() > 100);
KTable<String, Long> salesPerCategory = filteredStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .reduce(Long::sum);

This additional operation ensures that only transactions above $100 contribute to the sales totals.

Part 4: Enhancing the Application: Windowed Aggregations

Now, let’s say we want to analyze sales not only in total but also within specific time windows. Kafka Streams allows us to do this through windowed aggregations. We’ll modify our application to compute hourly sales totals:

Java<span role="button" tabindex="0" data-code="TimeWindows windows = TimeWindows.of(Duration.ofHours(1)).advanceBy(Duration.ofMinutes(1)); KTable<windowed<string>, Long> windowedSalesPerCategory = filteredStream .map((key, value) -> new KeyValue
TimeWindows windows = TimeWindows.of(Duration.ofHours(1)).advanceBy(Duration.ofMinutes(1));

KTable<Windowed<String>, Long> windowedSalesPerCategory = filteredStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .windowedBy(windows)
  .reduce(Long::sum);

windowedSalesPerCategory.toStream().to("windowed-sales-per-category-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

Here, we create a TimeWindows object that defines the window duration (1 hour) and how much the window advances each time (1 minute). We apply the windowed aggregation through the windowedBy operation.

Part 5: Testing the Kafka Streams Application

Now that we have our streaming application, it’s time to test it. Kafka provides the TopologyTestDriver for this purpose, a powerful tool for testing your Streams applications without needing a running Kafka cluster.

We’ll write a JUnit test that verifies our application correctly calculates sales per category and excludes transactions below $100. We’ll also verify that the windowed aggregation works as expected:

Java<span role="button" tabindex="0" data-code="// Setup test driver and test data here… // Feeding input data testDriver.pipeInput(factory.create("sales-topic", "category1", "50")); testDriver.pipeInput(factory.create("sales-topic", "category1", "150")); testDriver.pipeInput(factory.create("sales-topic", "category2", "200")); testDriver.advanceWallClockTime(Duration.ofHours(1)); testDriver.pipeInput(factory.create("sales-topic", "category1", "200")); // Verifying the output data ProducerRecord<string, Long> outputRecord1 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer()); assertEquals("category1", outputRecord1.key()); assertEquals(Long.valueOf(150L), outputRecord1.value()); ProducerRecord<string, Long> outputRecord2 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer()); assertEquals("category2", outputRecord2.key()); assertEquals(Long.valueOf(200 L), outputRecord2.value()); ProducerRecord<windowed<string>, Long> windowedOutputRecord = testDriver.readOutput("windowed-sales-per-category-topic", new WindowedDeserializer
// Setup test driver and test data here...

// Feeding input data
testDriver.pipeInput(factory.create("sales-topic", "category1", "50"));
testDriver.pipeInput(factory.create("sales-topic", "category1", "150"));
testDriver.pipeInput(factory.create("sales-topic", "category2", "200"));
testDriver.advanceWallClockTime(Duration.ofHours(1));
testDriver.pipeInput(factory.create("sales-topic", "category1", "200"));

// Verifying the output data
ProducerRecord<String, Long> outputRecord1 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category1", outputRecord1.key());
assertEquals(Long.valueOf(150L), outputRecord1.value());

ProducerRecord<String, Long> outputRecord2 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category2", outputRecord2.key());
assertEquals(Long.valueOf(200

L), outputRecord2.value());

ProducerRecord<Windowed<String>, Long> windowedOutputRecord = testDriver.readOutput("windowed-sales-per-category-topic", new WindowedDeserializer<>(new StringDeserializer()), new LongDeserializer());
assertEquals("category1", windowedOutputRecord.key().key());
assertEquals(Long.valueOf(200L), windowedOutputRecord.value());

In this test, we first feed some test data into our application through the pipeInput method. We also advance the wall clock time to simulate the passage of time. After processing the input data, we verify the output records from both the total sales topic and the windowed sales topic.

Conclusion

Congratulations! You’ve built a real-time analytics application using Kafka Streams, from scratch. In the process, you learned about the basic principles of Kafka Streams, how to create a processing topology, and how to perform stateless and stateful operations. You also dove into windowed aggregations and testing your Kafka Streams applications.

This tutorial only scratches the surface of what you can do with Kafka Streams. With its powerful DSL and intuitive API, Kafka Streams opens up a world of possibilities for real-time data processing. As you continue to explore Kafka Streams, you may find yourself tackling more complex use-cases such as joins, interactive queries, and exactly-once processing.

Remember that Kafka Streams is not a standalone service, but a part of your application. This means you can leverage all the tools and libraries from your ecosystem, leading to a simplified operational model and increased flexibility.

Categorized in: