Kafka Streamsreduce and aggregate operations both let you combine elements within a stream, but they solve subtly different problems, and picking the wrong one can lead to unexpected results and performance issues.

Let’s see aggregate in action. Imagine we want to count the occurrences of each word in a stream of text messages.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input-topic");

KTable<String, Long> wordCounts = textLines
    .flatMapValues(text -> Arrays.asList(text.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .aggregate(
        // Initializer: A supplier for the initial count (0L)
        () -> 0L,
        // Aggregator: How to add a new value to the current count
        (key, word, currentCount) -> currentCount + 1,
        // Merger: How to combine counts from different partitions (not needed for simple counts)
        (key, count1, count2) -> count1 + count2,
        // Materialized View: Store the counts in a state store
        Materialized.with(Serdes.String(), Serdes.Long())
    );

wordCounts.toStream().to("word-counts-output-topic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

Here, aggregate is perfect because we need to maintain a running count for each distinct word across the entire stream. It starts with an initial value (0) for each new key and then applies an update function.

Now, consider reduce. reduce is designed for situations where you want to combine elements without an initial starting value, and the combination function is associative and commutative. A classic example is finding the maximum value for each key.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> valueStream = builder.stream("value-input-topic");

KTable<String, Integer> maxValuePerKey = valueStream
    .groupByKey()
    .reduce(
        // Reducer: How to combine two values for the same key
        (value1, value2) -> Math.max(value1, value2),
        // Materialized View: Store the maximums
        Materialized.with(Serdes.String(), Serdes.Integer())
    );

maxValuePerKey.toStream().to("max-value-output-topic", Produced.with(Serdes.String(), Serdes.Integer()));

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

In this reduce example, the first time a key appears, its value is the result. Subsequent values are combined with the existing result using the Math.max logic. There’s no separate "initial state" to manage; the first element bootstraps the process.

The core difference lies in how they handle the first element for a given key. aggregate explicitly defines an initializer (e.g., () -> 0L for a count). reduce implicitly uses the first encountered value as the initial state. If you use reduce when you really need an initial state (like a zero count), you’ll miss the first occurrence of any key. Conversely, if you use aggregate with an initializer that isn’t strictly necessary, you’re adding a small, but potentially noticeable, overhead.

The aggregate operation also has a merger function. This is crucial for distributed processing. When Kafka Streams needs to combine the results from different processing tasks (e.g., different partitions of an upstream topic), the merger function tells it how to merge two aggregated states. For reduce, there’s no explicit merger because the reduction is typically done in a way that’s naturally commutative and associative, and the framework handles the merging by applying the same reduction logic. However, if your reduce operation isn’t truly associative or commutative, you can run into data inconsistencies across partitions.

If your aggregation requires a starting point—a zero, an empty list, an initial timestamp—use aggregate. If you’re simply combining elements where the first element is the starting point, and the combination function is associative and commutative, reduce is more direct. aggregate is more flexible due to its explicit initializer and merger, making it suitable for a wider range of aggregation tasks, especially those involving state that needs to be initialized.

The most common pitfall is using reduce for counting. If you count using reduce like (currentCount, newValue) -> currentCount + newValue, and the currentCount is implicitly the first value seen, your counts will be off by one for every key. You’d be starting your count from the first actual value, not from zero.

When you decide to use aggregate with a Materialized view, you’re also implicitly defining a state store. The Materialized.with() method specifies the key and value serdes for this store. This state store is what allows Kafka Streams to maintain the aggregated results and make them queryable.

The next concept you’ll likely grapple with is how to handle windowed aggregations, where you want to aggregate data within specific time windows rather than indefinitely.

Want structured learning?

Take the full Kafka-streams course →