Introduction
Real-time analytics has become a crucial component of modern software architectures, especially when dealing with vast amounts of data generated from various sources. Apache Kafka, a robust and efficient distributed messaging system, provides a library for processing and analyzing this real-time data called Kafka Streams.
Today, we are diving into Kafka Streams, creating a real-time analytics application that will consume a stream of sales data and compute the total sales per product category. Strap in and enjoy the ride.
Part 1: Kafka Streams 101
Before diving into the code, it’s crucial to understand the Kafka Streams architecture and its core concepts. In a nutshell, Kafka Streams allows you to define a “topology” of processing nodes. Each node represents a computational step (e.g., mapping, filtering, aggregation) in your data pipeline. Kafka Streams processes data in real-time, record by record, which enables highly responsive and context-aware applications.
Part 2: Setting Up the Kafka Streams Application
Our real-time analytics application will use Maven for dependency management. Below are the necessary dependencies:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
After setting up our project, we need to configure our Streams application:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sales-analytics-app");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Our configuration includes the application ID, which uniquely identifies the application across the Kafka cluster. We also specify the address of our Kafka broker and default SerDes (Serializer/Deserializer) classes, which determine how to convert between Java objects and bytes for network transmission.
Next, we define our processing topology. The initial stream (“sales-topic”) receives sales data as input. We then transform this stream into a table (“sales-per-category-topic”) that reflects the total sales per product category:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> sourceStream = builder.stream("sales-topic");
KTable<String, Long> salesPerCategory = sourceStream
.map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
.groupByKey()
.reduce(Long::sum);
salesPerCategory.toStream().to("sales-per-category-topic", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Let’s dissect the steps:
- We create a new
StreamsBuilder
, which provides the high-level Streams DSL. - We specify the source topic of our stream.
- We transform the data. Each sales record gets mapped into a
KeyValue
pair, with the product category as the key and the sale amount as the value. - We group the data by key (i.e., product category) and sum the sales amounts.
- We output the result to a new topic.
- We create a
KafkaStreams
instance and start processing.
Part 3: Deep Dive into Data Processing
With the basic setup out of the way, let’s delve deeper into Kafka Streams’ data processing capabilities. Kafka Streams offers two types of data manipulation operations: Stateless and Stateful.
Stateless operations, such as map
and filter
, process each record independently. In contrast, stateful operations, like groupByKey
and reduce
, consider the context of previous records.
In our application, we use the map
operation to transform the sales records and a combination of groupByKey
and reduce
to aggregate the sales amounts. Let’s add a filter
operation to exclude any sales transactions below a certain threshold:
KStream<String, String> filteredStream = sourceStream.filter((key, value) -> value.getSaleAmount() > 100);
KTable<String, Long> salesPerCategory = filteredStream
.map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
.groupByKey()
.reduce(Long::sum);
This additional operation ensures that only transactions above $100 contribute to the sales totals.
Part 4: Enhancing the Application: Windowed Aggregations
Now, let’s say we want to analyze sales not only in total but also within specific time windows. Kafka Streams allows us to do this through windowed aggregations. We’ll modify our application to compute hourly sales totals:
TimeWindows windows = TimeWindows.of(Duration.ofHours(1)).advanceBy(Duration.ofMinutes(1));
KTable<Windowed<String>, Long> windowedSalesPerCategory = filteredStream
.map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
.groupByKey()
.windowedBy(windows)
.reduce(Long::sum);
windowedSalesPerCategory.toStream().to("windowed-sales-per-category-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
Here, we create a TimeWindows
object that defines the window duration (1 hour) and how much the window advances each time (1 minute). We apply the windowed aggregation through the windowedBy
operation.
Part 5: Testing the Kafka Streams Application
Now that we have our streaming application, it’s time to test it. Kafka provides the TopologyTestDriver
for this purpose, a powerful tool for testing your Streams applications without needing a running Kafka cluster.
We’ll write a JUnit test that verifies our application correctly calculates sales per category and excludes transactions below $100. We’ll also verify that the windowed aggregation works as expected:
// Setup test driver and test data here...
// Feeding input data
testDriver.pipeInput(factory.create("sales-topic", "category1", "50"));
testDriver.pipeInput(factory.create("sales-topic", "category1", "150"));
testDriver.pipeInput(factory.create("sales-topic", "category2", "200"));
testDriver.advanceWallClockTime(Duration.ofHours(1));
testDriver.pipeInput(factory.create("sales-topic", "category1", "200"));
// Verifying the output data
ProducerRecord<String, Long> outputRecord1 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category1", outputRecord1.key());
assertEquals(Long.valueOf(150L), outputRecord1.value());
ProducerRecord<String, Long> outputRecord2 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category2", outputRecord2.key());
assertEquals(Long.valueOf(200
L), outputRecord2.value());
ProducerRecord<Windowed<String>, Long> windowedOutputRecord = testDriver.readOutput("windowed-sales-per-category-topic", new WindowedDeserializer<>(new StringDeserializer()), new LongDeserializer());
assertEquals("category1", windowedOutputRecord.key().key());
assertEquals(Long.valueOf(200L), windowedOutputRecord.value());
In this test, we first feed some test data into our application through the pipeInput
method. We also advance the wall clock time to simulate the passage of time. After processing the input data, we verify the output records from both the total sales topic and the windowed sales topic.
Conclusion
Congratulations! You’ve built a real-time analytics application using Kafka Streams, from scratch. In the process, you learned about the basic principles of Kafka Streams, how to create a processing topology, and how to perform stateless and stateful operations. You also dove into windowed aggregations and testing your Kafka Streams applications.
This tutorial only scratches the surface of what you can do with Kafka Streams. With its powerful DSL and intuitive API, Kafka Streams opens up a world of possibilities for real-time data processing. As you continue to explore Kafka Streams, you may find yourself tackling more complex use-cases such as joins, interactive queries, and exactly-once processing.
Remember that Kafka Streams is not a standalone service, but a part of your application. This means you can leverage all the tools and libraries from your ecosystem, leading to a simplified operational model and increased flexibility.
Subscribe to our email newsletter to get the latest posts delivered right to your email.