Kafka Streams uses RocksDB as its default state store backend, and understanding its configuration options is crucial for performance and stability.
Let’s see it in action. Imagine a simple Kafka Streams application that counts word occurrences.
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("word-counts-store"));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
When this application runs, Kafka Streams creates a word-counts-store. By default, this store is backed by RocksDB, which efficiently stores and retrieves the counts. The data for this store is persisted to disk in a directory named after the store, within the Kafka Streams application’s state.dir directory (e.g., /tmp/kafka-streams/word-counts-store).
The problem this solves is maintaining state across distributed, fault-tolerant stream processing. Without state stores, each event would be processed in isolation, making aggregations like counts or joins impossible. Kafka Streams leverages RocksDB to provide:
- Durability: Data is written to disk, surviving application restarts.
- Fault Tolerance: If a Kafka Streams instance fails, another can take over and resume from the last committed state.
- Performance: RocksDB is an embedded key-value store optimized for fast reads and writes.
Internally, Kafka Streams interacts with RocksDB through its Java API. When you perform an operation like count(), Kafka Streams translates this into RocksDB Put operations for new or updated counts. When you read the state (e.g., to serve a query or for re-processing), it translates to RocksDB Get operations. RocksDB handles the underlying complexity of data indexing, compression, and write-ahead logging.
The key levers you control for RocksDB are exposed through the StreamsConfig under the rocksdb.config.setter property. This allows you to inject a custom RocksDBConfigSetter implementation.
Here’s how you’d configure RocksDB options:
import org.apache.kafka.streams.StreamsConfig;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBConfigSetter;
import java.util.Properties;
public class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName, final Options options, final RocksDB rocksDb) {
// Example: Increase write buffer size for faster writes
options.setWriteBufferSize(67108864); // 64MB
// Example: Enable block cache for faster reads
org.rocksdb.BlockBasedTableConfig tableOptions = new org.rocksdb.BlockBasedTableConfig();
tableOptions.setBlockCacheSize(536870912); // 512MB
tableOptions.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableOptions);
// Example: Enable snappy compression for reduced disk space
options.setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION);
// Example: Set the number of background threads for compactions
options.setIncreaseParallelism(4);
// Example: Set a custom comparator if needed (e.g., for byte arrays)
// options.setComparator(BuiltinComparator.BYTEARRAY);
}
// ... rest of your Kafka Streams application setup ...
}
And in your StreamsConfig:
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class.getName());
The setConfig method is invoked by Kafka Streams for each state store it creates, allowing you to tune RocksDB’s behavior. The storeName parameter is useful if you want to apply different configurations to different stores within the same application. The Options object is where you set most of RocksDB’s tuning parameters, such as writeBufferSize, maxWriteBufferNumber, levelCompactionDynamicLevelBytes, and various table-specific options like blockCacheSize and blockSize.
You can dramatically improve read performance by enabling and sizing the block cache. This cache stores frequently accessed data blocks in memory, reducing the need to go to disk. Similarly, increasing the write buffer size can absorb bursts of writes more effectively, reducing the overhead of flushing data to disk and merging it later. Enabling compression, like Snappy, reduces the disk footprint but adds a small CPU overhead during reads and writes. The increaseParallelism setting controls how many background threads RocksDB uses for tasks like compaction, which can be beneficial on multi-core systems.
One common performance bottleneck, especially with high write volumes, is the frequency and cost of background compactions. RocksDB uses compactions to merge SST (Sorted String Table) files, which are the on-disk data files. If compactions can’t keep up with the write rate, the number of SST files can grow, leading to slower reads and increased disk I/O. Tuning writeBufferSize, maxWriteBufferNumber, minWriteBufferNumberToMerge, and levelCompactionDynamicLevelBytes can significantly impact compaction performance. For instance, larger write buffers mean fewer, larger flushes, which can be more efficient for RocksDB to manage.
After tuning RocksDB, you might encounter issues related to the Operating System’s file descriptor limits. RocksDB opens many files for its SST tables. If your ulimit -n (number of open files) is too low, your Kafka Streams application can crash with Too many open files errors, which might manifest as RocksDB exceptions or general application instability.