Kafka Streams’ windowed aggregations are a powerful tool, but the real magic isn’t just grouping data by time; it’s how the system rewinds and recalculates past results when late-arriving data forces a re-evaluation of a window.

Let’s see this in action with a simple example. Imagine we’re tracking the total number of orders within 5-minute tumbling windows.

// Kafka Streams configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); // Ensure this directory exists

StreamsBuilder builder = new StreamsBuilder();

// Input topic: "orders" (key: String, value: Long, e.g., order ID, quantity)
KStream<String, Long> orders = builder.stream("orders");

// Define a 5-minute tumbling window
TimeWindows tumblingWindows = TimeWindows.ofSizeWithNoGap(Duration.ofMinutes(5));

// Aggregate by order type (key) within the tumbling windows
KTable<Windowed<String>, Long> aggregatedOrders = orders
    .groupByKey() // Group by the key (e.g., product name)
    .windowedBy(tumblingWindows)
    .aggregate(
        () -> 0L, // Initializer
        (key, value, aggregate) -> aggregate + value, // Aggregator
        Materialized.with(Serdes.String(), Serdes.Long()) // Serdes for key and value
    );

// Output topic: "aggregated-orders"
aggregatedOrders
    .toStream((windowedKey, value) -> windowedKey.key()) // Extract the original key
    .to("aggregated-orders");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Example data sent to "orders" topic:
// Key: "apple", Value: 10 (at 10:01:00)
// Key: "banana", Value: 5 (at 10:02:30)
// Key: "apple", Value: 20 (at 10:03:00)
// Key: "banana", Value: 8 (at 10:07:00) - This is in the *next* window
// Key: "apple", Value: 15 (at 10:09:00) - This is also in the next window
// Key: "apple", Value: 30 (at 10:04:50) - ***Late-arriving data for the first window***

When that last apple order for 30 arrives at 10:04:50, it falls within the 10:00:00 to 10:05:00 window. Kafka Streams doesn’t just add 30 to the current sum for apple for that window. Instead, it looks at the stored state for the 10:00:00 to 10:05:00 window, which might have already been emitted. It then recalculates the entire sum for that window, including the late 30, and emits a new result for that window. This ensures the final output for any given window is always correct, even with out-of-order or late data.

The core problem windowed aggregations solve is performing time-bound computations on unbounded streams. Without windows, you’d have to maintain state indefinitely or use a fixed, arbitrary grouping. Windows provide a natural, time-based boundary. Tumbling windows are like fixed-size bins; each event falls into exactly one bin based on its timestamp. Hopping windows, on the other hand, allow for overlapping bins, meaning an event can be part of multiple aggregations if its timestamp falls within multiple hopping windows.

Internally, Kafka Streams uses RocksDB (by default) to store the state for each window. When an aggregation is performed, the current value is updated in the state store. When a window closes (based on its grace period), the final aggregated value is emitted. The grace period in TimeWindows.grace() is crucial for handling late-arriving data. It defines how long after the window’s end time Kafka Streams will continue to accept and process records for that window. If a record arrives within the grace period, it’s considered "late" but still processed, triggering recalculations. If it arrives after the grace period, it’s dropped.

The exact levers you control are the window size, the window type (tumbling, hopping, session), the grace period, and how you define the aggregation logic (initializer, aggregator, subtractor for deletion). For hopping windows, you also define the hop size, which is the time interval between the start of consecutive windows. A hop size smaller than the window size creates overlaps.

The most surprising aspect of Kafka Streams windowed aggregations is how they manage state for multiple overlapping windows simultaneously when using hopping windows. When a record arrives, the system doesn’t just check which single window it belongs to; it identifies all active hopping windows whose time span includes the record’s timestamp. It then updates the state for each of those windows independently, ensuring that each overlapping window’s aggregation is correctly maintained. This can lead to a significant number of state stores being active concurrently if your window sizes and hop sizes are small and your event rate is high.

The next concept to explore is session windows, which dynamically group records based on activity gaps rather than fixed time intervals.

Want structured learning?

Take the full Kafka-streams course →