Kafka Streams is more than just a Kafka client; it’s a stateful stream processing framework that runs on the Kafka cluster itself.

Let’s look at a real-world Kafka Streams application. Imagine an e-commerce platform processing clickstream data.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class ClickstreamProcessor {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "clickstream-processor-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // Commit every 5 seconds
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); // 2 threads

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> clicks = builder.stream("click-events");
        KTable<String, Long> userClickCounts = clicks
                .groupByKey()
                .count();

        userClickCounts.toStream().to("user-click-counts", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp(); // Start with a clean state
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

This application reads from a click-events topic, counts clicks per user (the key), and writes the results to user-click-counts. The APPLICATION_ID_CONFIG is crucial; it’s not just a name, but a namespace for all internal Kafka Streams topics (like changelog topics for state stores) and the consumer group ID.

The core mental model of Kafka Streams revolves around topology and state. A topology is a directed acyclic graph (DAG) of processors. Each processor consumes records from one or more input topics, performs some operation (like filtering, mapping, or aggregation), and potentially produces records to output topics or updates an internal state store.

The StreamsBuilder defines this topology. builder.stream("input-topic") creates a source processor. groupByKey() and count() are intermediate operations that add more processors and state stores to the DAG. The count() operation, in particular, relies on a local state store (a RocksDB instance by default) to maintain the running count for each key. When a record arrives for a key, the processor fetches the current count from the state store, increments it, and writes the updated count back to the store.

CACHE_MAX_BYTES_BUFFERING_CONFIG controls the amount of memory Kafka Streams uses for caching intermediate data before flushing to disk or sending to downstream processors. A higher value can improve performance by reducing disk I/O but increases memory consumption. COMMIT_INTERVAL_MS_CONFIG dictates how often Kafka Streams commits the offsets of processed records to Kafka. This interval is critical for fault tolerance; a shorter interval means less data is reprocessed upon failure, but it increases Kafka broker load. NUM_STREAM_THREADS_CONFIG determines how many threads are dedicated to processing records within a single Kafka Streams instance. More threads can increase throughput but also consume more CPU and memory.

The cleanUp() method before streams.start() is important for development and testing. It deletes all local state and internal Kafka topics associated with the APPLICATION_ID_CONFIG, ensuring a fresh start. In production, you’d typically omit this or manage state cleanup more carefully.

The default.key.serde and default.value.serde configurations tell Kafka Streams how to serialize and deserialize keys and values for records read from and written to Kafka topics. Using Serdes.String() and Serdes.Long() as shown is common for simple string keys and long counts. For complex data structures, you’d use JsonSerde or custom serializers.

The magic of Kafka Streams for stateful processing lies in its automatic sharding and fault tolerance. Kafka partitions are the unit of parallelism. If your click-events topic has 10 partitions, Kafka Streams will distribute these partitions across multiple instances of your ClickstreamProcessor application. Each instance processes a subset of the partitions. If an instance fails, Kafka Streams automatically rebalances the partitions to other running instances, and the state stores are restored from Kafka’s changelog topics.

The way Kafka Streams manages state is by writing all changes to a local, fault-tolerant state store (usually RocksDB) and also publishing these changes to a Kafka changelog topic. This changelog topic acts as a persistent log of all state updates. When a Streams instance starts, it can reconstruct its local state by replaying the changelog topic from the beginning or from a saved offset. This is why application.id is so important – it defines the changelog topics.

When you define a KTable operation like count(), Kafka Streams internally creates a state store. For each input record, it updates the state store and also writes an update record to a changelog topic, typically named <application-id>-<store-name>-changelog. This changelog topic is what allows fault tolerance. If a stream processing instance crashes, another instance can take over its partitions, read the changelog topic, and rebuild the exact same local state. The commit.interval.ms setting controls how frequently these state store changes are flushed to the changelog topic and their corresponding Kafka offsets are committed.

The producer.new.thread.count setting, though not explicitly in the example, is another important configuration. It controls how many threads are used by the Kafka producer client within Kafka Streams to send output records. Increasing this can improve throughput for applications that produce a lot of output.

Want structured learning?

Take the full Kafka-streams course →