Kafka Streams’ state stores, which are crucial for processing and aggregating data, are backed by Kafka topics. When a Streams application restarts, it needs to restore this state from the backing topic. Changelog compaction is a mechanism that helps reduce the size of these backing topics, speeding up state restoration.
Let’s see it in action. Imagine a simple word count application.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
public class WordCountChangelog {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-changelog-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// State store configuration for changelog compaction
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); // Ensure this directory exists and is writable
props.put("state.cleanup.policy", "compact"); // This is the key for changelog compaction
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-lines");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> java.util.Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy(word -> word)
.count();
wordCounts.toStream().to("word-counts", org.apache.kafka.streams.kstream.Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp(); // Clean up previous state before starting
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
When this application runs, Kafka Streams creates a local state store for the word-counts aggregation. This state is backed by a Kafka topic named word-counts-KTABLE-COUNT-0000000000-changelog. Each time a word count changes, a new record is written to this changelog topic. Without compaction, this topic would grow indefinitely, containing every single update ever made.
The core problem changelog compaction solves is the explosion of state backing topic size, leading to glacial state restoration times. When a Kafka Streams application restarts, it must read the entire changelog topic for its state stores to rebuild its internal state. If this topic is massive, the restart process can take an unacceptably long time, leading to prolonged application downtime. Changelog compaction ensures that only the latest version of each key’s state is retained in the changelog topic, drastically reducing its size and thus the time required to restore state.
Internally, Kafka Streams uses a Kafka topic as a durable log for its state stores. For stateful operations like aggregations (e.g., count, reduce, aggregate), a changelog topic is created. This topic records every change made to the state. When changelog compaction is enabled, Kafka’s log compaction mechanism kicks in. For each key, the log cleaner process retains only the most recent message and discards older messages for the same key. This effectively transforms the changelog topic from a full history of all updates into a compact representation of the current state.
The primary lever you control is the state.cleanup.policy configuration property. Setting this to compact for the Kafka broker itself (via server.properties or broker.properties) or for specific topics using kafka-topics.sh --alter --topic <topic-name> --config cleanup.policy=compact enables this behavior. For Kafka Streams state changelogs, you can also set state.cleanup.policy within the StreamsConfig to compact. This tells Kafka Streams to ensure its backing topics are configured for compaction.
When you configure a Kafka broker or topic for log compaction, you’re essentially telling the broker to periodically scan its topic segments and remove messages whose keys have appeared again later in the log. The retention strategy is to keep the last message for each unique key. This is crucial because for state stores, we only care about the current value of a key to reconstruct the state, not the entire history of how it got there.
A common misconception is that changelog compaction magically makes your state stores disappear. It doesn’t. It compacts the backing Kafka topic that represents the changelog of your state store. The actual state store (e.g., RocksDB instance on disk) is a separate entity that is rebuilt from this compacted changelog topic during restoration.
The configuration log.cleaner.enable=true must also be set in the Kafka broker’s configuration for any compaction to occur. Without this broker-level setting, topic-level or stream-level compaction settings will have no effect.
The next problem you’ll likely encounter is efficiently handling large amounts of data that don’t fit into memory for state, leading you to explore Kafka Streams’ StateStore configurations and potentially custom storage engines.