Kafka Streams’ cooperative rebalance mechanism can drastically reduce downtime during application restarts or scaling events by allowing instances to gracefully hand off processing tasks.
Let’s see it in action. Imagine you have a Kafka Streams application processing a page-views topic and writing aggregated counts to a page-view-counts topic.
// Producer sending data
kafka-console-producer --bootstrap-server localhost:9092 --topic page-views
> user1, /home
> user2, /about
> user1, /contact
> user3, /home
// Kafka Streams application (simplified)
StreamsBuilder builder = new StreamsBuilder();
builder.stream("page-views")
.groupByKey()
.count(Materialized.as("counts-store"))
.toStream()
.to("page-view-counts", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
Now, let’s say you need to restart your Kafka Streams application, perhaps to deploy a new version. Without cooperative rebalancing, when the application instance stops, its assigned partitions are immediately released and reassigned to other running instances. If there are no other running instances, or if the reassignment takes time, processing halts, leading to downtime.
Cooperative rebalancing changes this. When an instance signals it’s shutting down, it enters a "graceful shutdown" phase. Instead of immediately releasing its partitions, it negotiates with the Kafka broker and other running Streams instances. It says, "Hey, I’m going away, but I’ll hold onto my partitions for a bit while we figure out who takes them." This period, known as the session.timeout.ms, allows other instances to pick up the partitions without a complete interruption of processing.
Here’s how it works under the hood:
When an instance starts, it joins a consumer group. In Kafka, consumer groups manage partition assignments. In a traditional "eager" rebalance, when any member joins or leaves, the entire group rebalances, and all partitions are reassigned. This can cause a brief pause in processing for all members while the new assignments are distributed.
With cooperative rebalancing, the leaving instance doesn’t immediately trigger a full group rebalance. Instead, it informs the group coordinator (a Kafka broker) that it’s leaving. The coordinator then starts a rebalance without revoking the partitions from the departing instance yet. The remaining instances are notified that a rebalance is happening. The departing instance, meanwhile, continues to process its assigned partitions until the new assignments are ready. Once the other instances have accepted ownership of the partitions, the departing instance can finally release them.
The key configuration here is session.timeout.ms. This setting determines how long a broker will wait for a consumer (in this case, a Kafka Streams instance) to check in before considering it dead. In cooperative rebalancing, this timeout is crucial. When an instance shuts down gracefully, it stays "alive" in the group coordinator’s eyes for the duration of session.timeout.ms (or until it successfully hands off partitions). This gives the system enough breathing room to perform the partition reassignment without a hard stop. A typical value might be 10000 (10 seconds).
Another related setting is heartbeat.interval.ms. This defines how often the consumer sends heartbeats to the broker. A smaller heartbeat interval (e.g., 3000 milliseconds) means the broker knows faster if an instance is truly unresponsive, which is important for detecting failures but also needs to be balanced with the session.timeout.ms for graceful shutdowns.
The magic happens because Kafka Streams instances, by default, use cooperative rebalancing when max.poll.interval.ms is set to a value greater than 0 and the client is configured as a Kafka Streams client. The default max.poll.interval.ms is 5 minutes (300000 ms), which is usually much longer than the actual rebalance time. This allows the Streams client to continue processing for an extended period even if it’s waiting for a rebalance to complete.
The actual handoff involves Kafka Streams’ internal state management. When partitions are reassigned, the new instance responsible for those partitions will restore their state from the associated Kafka topic (if using state stores like RocksDB). Cooperative rebalancing ensures that the source partitions for state restoration are still available and being written to by the departing instance until the last moment, minimizing data loss and ensuring a quicker state recovery for the new instance.
The most surprising thing about cooperative rebalance is that it’s the default behavior for Kafka consumer groups since Kafka 0.10.0, and Kafka Streams leverages it implicitly. You don’t typically need to enable cooperative rebalance itself; rather, you need to ensure your session.timeout.ms and heartbeat.interval.ms are tuned appropriately for graceful shutdowns, and that your application logic within poll() calls doesn’t exceed max.poll.interval.ms without calling commitSync() or commitAsync().
Once your cooperative rebalances are smooth, you’ll likely encounter issues with state store restoration times if your state stores are very large, potentially leading to longer-than-expected application startup times.