Kafka Streams applications don’t just stop; they perform a delicate dance to ensure data integrity and avoid leaving a mess.

Let’s watch a Kafka Streams app in action. Imagine we have a simple word count example.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;

public class WordCountDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // Crucial for graceful shutdown and state management
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-state");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("TextLinesTopic");

        source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
              .groupBy((key, value) -> value)
              .count()
              .toStream()
              .to("WordsWithCountsTopic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // Register the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.start();

        System.out.println("Kafka Streams WordCount App Started. Press Ctrl+C to stop.");
    }
}

When you run this, it reads lines from TextLinesTopic, splits them into words, counts them, and writes the counts to WordsWithCountsTopic. The magic happens when you press Ctrl+C. Instead of abruptly terminating, the addShutdownHook code kicks in.

The streams.close() method initiates a graceful shutdown. This involves several key steps. First, it stops accepting new input records. Then, it ensures all currently processing records are finished. Crucially, it commits any pending state changes to Kafka (if using EXACTLY_ONCE or AT_LEAST_ONCE). Finally, it closes all underlying connections and releases resources.

The state directory, configured via StreamsConfig.STATE_DIR_CONFIG, is where Kafka Streams stores its local state stores (like the counts in our word count example). When streams.close() is called, Kafka Streams writes the final state to these local stores and then, importantly, it also publishes these final state changes to a changelog topic. This changelog topic is fundamental for recovery. If the application restarts, it can read from this changelog topic to reconstruct its state, ensuring that no counts are lost.

What most people don’t realize is that the PROCESSING_GUARANTEE_CONFIG setting directly impacts how state is handled during shutdown. When set to EXACTLY_ONCE or EXACTLY_ONCE_V2, Kafka Streams uses Kafka’s transactional capabilities. This means that before closing, it ensures all in-flight transactions (which include state updates) are committed to Kafka. This guarantees that even if a shutdown occurs during a state update, the entire operation is either committed or rolled back, preventing partial updates and data corruption. Without this, a sudden stop could leave the local state inconsistent with the changelog.

The next conceptual hurdle is understanding how Kafka Streams handles rebalancing and consumer group management when multiple instances of the same application are running.

Want structured learning?

Take the full Kafka-streams course →