Kafka Streams, when writing to an output topic, uses an internal Kafka producer. Tuning this producer’s configuration can significantly impact throughput, latency, and data durability.

Let’s see this in action. Imagine a simple Kafka Streams application that doubles every input number and writes it to an output topic.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> source = builder.stream("input-topic");

KStream<String, Long> doubled = source.mapValues(value -> value * 2);

// This is where the producer configuration comes into play
doubled.to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

The streamsConfig object is where we’ll inject producer properties. For example, to set the acks configuration:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

// Injecting producer specific configurations
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "1"); // Default is "1"
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "50");
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), "16384");

The StreamsConfig.producerPrefix() is the key here. Any configuration property prefixed with streams.producer. will be applied to the Kafka producer instance that Kafka Streams uses internally for writing to output topics.

The core problem Kafka Streams solves is stateful stream processing. It allows you to build applications that can react to incoming data streams in real-time, maintaining state across events. This is crucial for tasks like aggregations, joins, and complex event processing. Internally, Kafka Streams leverages Kafka topics for both input and output, and for its own internal changelog topics (for state stores). When writing to an output topic, it’s essentially acting as a Kafka producer, and its performance is governed by the same parameters as any standalone producer.

The acks configuration dictates how many acknowledgments the producer requires from the Kafka brokers before considering a record sent. acks=0 means the producer doesn’t wait for any acknowledgment, offering the highest throughput but the least durability (data loss is possible if a broker fails before replicating). acks=1 (the default) means the producer waits for the leader broker to acknowledge the write, offering a good balance. acks=all (or -1) means the producer waits for all in-sync replicas to acknowledge the write, providing the strongest durability guarantee but potentially higher latency.

linger.ms is the time the producer will wait to gather more records before sending a batch. Increasing this value can improve throughput by allowing more records to be batched together, but it also increases latency. Decreasing it reduces latency but might lead to smaller, less efficient batches.

batch.size is the maximum size of a batch of records that the producer will send to a broker. If a batch reaches this size before linger.ms elapses, it will be sent immediately. Larger batch sizes can improve throughput by reducing the overhead per record, but they also increase memory usage and can lead to longer send times for individual batches.

The buffer.memory configuration sets the total memory available for buffering records that haven’t been sent to the Kafka brokers yet. If this buffer fills up, the producer will block or throw errors, impacting application performance. It’s a safety valve to prevent the producer from overwhelming the network or brokers.

compression.type allows you to compress the data sent to Kafka. Options include none, gzip, snappy, and lz4. Compression reduces network bandwidth usage and storage requirements on Kafka brokers but adds CPU overhead on both the producer and consumer sides. snappy or lz4 often provide a good balance of compression ratio and CPU cost.

The max.in.flight.requests.per.connection setting controls how many unacknowledged requests a producer can have in flight to a single broker connection. Increasing this can improve throughput by allowing more concurrent requests, but it can also lead to reordering of messages if requests are retried after failures, which can be problematic for ordered processing. Kafka Streams generally handles ordering correctly even with some in-flight requests, but it’s a parameter to be mindful of.

Finally, retries and max.block.ms are crucial for resilience. Setting retries to a value greater than 0 allows the producer to automatically retry sending failed requests. max.block.ms is the maximum time that the send() method will block if the buffer is full or metadata is unavailable. Properly configuring these ensures that transient network issues or broker unavailability don’t immediately halt your stream processing.

When you tune these producer configurations, you’re directly influencing how efficiently and reliably your Kafka Streams application writes data to its output topics. It’s a direct lever on the network and storage interaction layer of your processing pipeline.

The next challenge you’ll likely encounter is optimizing the consumer side of your Kafka Streams application, particularly when dealing with stateful operations and ensuring exactly-once processing guarantees.

Want structured learning?

Take the full Kafka-streams course →