Kafka Streams repartitioning is a necessary evil when your processing logic requires changing the key of your data. The default behavior can incur significant cost.
Here’s a Kafka Streams app that demonstrates the issue and how to fix it:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
public class RepartitionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "repartition-demo-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
// Input stream with String keys
KStream<String, String> inputStream = builder.stream("input-topic");
// Transformation that changes the key
KStream<String, String> repartitionedStream = inputStream
.mapValues(value -> value.toUpperCase()); // Value transformation
// Now, let's imagine we need to group by a derived key.
// This will trigger a repartition.
KStream<String, Long> aggregatedStream = repartitionedStream
.selectKey((key, value) -> "user-" + value.split("-")[0]); // Key transformation
aggregatedStream
.groupByKey() // This is where the repartitioning happens implicitly
.count()
.toStream()
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
The Problem:
In the RepartitionDemo above, the selectKey operation followed by groupByKey() forces Kafka Streams to repartition the data. This means Kafka Streams must:
- Write the data to an intermediate topic (the repartition topic).
- Read the data from the repartition topic.
This repartitioning process involves serializing and deserializing data twice, writing to and reading from Kafka, and can be a significant performance bottleneck and cost driver, especially with high-volume topics. The intermediate topic can grow very large, consuming disk space and network bandwidth.
The Cost of Repartitioning:
When Kafka Streams needs to repartition, it creates a topic named applicationId-repartition-X-Y (where X and Y are internal stream task identifiers). For every record that needs repartitioning, Kafka Streams performs these steps:
- Serialization: The record’s new key and value are serialized.
- Produce: The serialized record is sent to the repartition topic.
- Consume: The record is read from the repartition topic.
- Deserialization: The record’s new key and value are deserialized.
- Processing: The record is then processed by the downstream operation (e.g.,
groupByKey,join).
This adds significant overhead.
Minimizing Repartition Cost:
The core idea is to avoid unnecessary repartitioning by performing key transformations before operations that require a specific key distribution, and to leverage Kafka Streams’ ability to handle key changes more efficiently.
Here are the common strategies to minimize repartition cost:
-
Combine
selectKeyandgroupByOperations: If your goal is to group by a key that’s derived after a value transformation, perform theselectKeyimmediately before thegroupByoraggregateoperation. Kafka Streams can often optimize this by avoiding an explicit intermediate repartition topic write/read for simple cases.- Diagnosis: Look for
selectKeyoperations that are separated by other stream transformations from the subsequentgroupByKey,aggregate, orjoinoperations. - Fix: Rearrange your DSL operations.
// Instead of: // KStream<String, String> transformed = inputStream.mapValues(v -> v.toUpperCase()); // KStream<String, String> rekeyed = transformed.selectKey((k, v) -> "new-key"); // rekeyed.groupByKey().count(); // Do this: KStream<String, String> rekeyedAndGrouped = inputStream .selectKey((key, value) -> "new-key-derived-from-value"); // Transform key directly before grouping rekeyedAndGrouped.groupByKey().count(); - Why it works: By chaining
selectKeydirectly before thegroupByKey, Kafka Streams can often "see" that the rekeying is directly for the grouping and can potentially optimize the internal processing, sometimes avoiding a full topic repartition if the internal partitioning scheme aligns.
- Diagnosis: Look for
-
Use
KTable.groupByfor Aggregations: When performing aggregations (likecount,reduce,aggregate) on aKStreamwhere the key needs to change, it’s often more efficient to useKTable.groupByafter theselectKeyoperation.- Diagnosis: You’re using
KStream.groupByKey()on a stream that has undergone aselectKeytransformation, and you’re performing an aggregation. - Fix: Convert to a
KTableand then group.KStream<String, String> inputStream = builder.stream("input-topic"); KStream<String, String> rekeyedStream = inputStream .selectKey((key, value) -> "derived-key"); // Instead of: rekeyedStream.groupByKey().count() // Do this: rekeyedStream .groupByKey() // This still implies repartitioning if the input is a KStream .count() // This returns a KTable .toStream() // Convert back if needed .to("output-topic"); // A more direct way for aggregations after rekeying: KTable<String, Long> aggregatedTable = rekeyedStream .groupByKey() // Kafka Streams understands this is for aggregation .count(); aggregatedTable.toStream().to("output-topic"); - Why it works:
KTableoperations are inherently stateful and designed for aggregations. When yougroupByKeyon aKStreamthat has just been rekeyed, Kafka Streams knows the intent is likely aggregation and can optimize. If the input togroupByKeyis already aKTable, no repartitioning is needed for thegroupByKeyitself; the repartitioning would have happened before it became aKTableif aselectKeywas involved. The key is to performselectKeybefore thegroupByKey()when the target is an aggregation.
- Diagnosis: You’re using
-
Explicitly Define Repartition Topics (Advanced): In rare, highly performance-sensitive scenarios, you might want to control the repartition topic name, number of partitions, and replication factor manually. This is usually done via Kafka Streams configuration.
- Diagnosis: You have identified repartitioning as a bottleneck and want finer control over the intermediate topics.
- Fix: Use
StreamsConfig.REPARTITION_TOPIC_PREFIX_CONFIGand manage partition counts for your input topics.
The number of partitions for the repartition topic is determined by the number of partitions of the source topic for that stream task. If your source topic has 10 partitions, the repartition topic will also have 10 partitions.props.put(StreamsConfig.REPARTITION_TOPIC_PREFIX_CONFIG, "my-custom-repartition-"); // You'd also need to ensure your input topics are partitioned appropriately // for the desired parallelism of your repartitioned stream. - Why it works: This gives you a predictable naming scheme for debugging and allows you to pre-create these topics with specific configurations (e.g.,
min.insync.replicas) if needed. However, it doesn’t eliminate the repartitioning itself, just controls its manifestation.
-
Avoid Unnecessary
selectKey: The most straightforward way to avoid repartitioning cost is to avoid operations that trigger it if they aren’t strictly necessary.- Diagnosis: You see
selectKeyor operations that implicitly change the key (likeflatMapreturning records with new keys) followed bygroupByKey,join, oraggregate. - Fix: Re-evaluate your processing logic. Can the downstream operation work with the existing key? Can you achieve the same result without changing the key?
// If you were doing this: // inputStream // .selectKey((k, v) -> extractNewKey(v)) // Repartition! // .groupByKey() // .count(); // Consider if you can use a different approach, e.g., a Processor API // or a different stream-stream join pattern if that's the goal. - Why it works: No repartitioning happens if no key transformation requiring redistribution occurs.
- Diagnosis: You see
-
Use
KStream.through()with Explicit Topic: When you need to send data to an intermediate topic and then process it,KStream.through()is the DSL method that explicitly creates a repartition topic. You can use this to control the topic name and partition count.- Diagnosis: You’ve identified a
selectKeyfollowed bygroupByKeyor similar, and you want explicit control over the intermediate topic. - Fix: Use
through()afterselectKey.KStream<String, String> inputStream = builder.stream("input-topic"); KStream<String, String> rekeyedStream = inputStream .selectKey((key, value) -> "derived-key"); // Instead of direct groupBy which might create a default repartition topic: // rekeyedStream.groupByKey().count(); // Use through to control the intermediate topic: KStream<String, String> intermediateStream = rekeyedStream.through("my-intermediate-repartition-topic", Produced.with(Serdes.String(), Serdes.String())); // Now operate on the intermediate topic intermediateStream .groupByKey() // This will read from "my-intermediate-repartition-topic" .count() .toStream() .to("output-topic"); - Why it works:
through()is the explicit DSL method for writing to a topic and reading back from it. This ensures the data lands on a named topic, which you can then configure. The number of partitions on "my-intermediate-repartition-topic" will be determined by Kafka Streams based on the source stream’s partitions unless explicitly configured otherwise during topic creation.
- Diagnosis: You’ve identified a
-
Leverage
KStream-KTableJoins: If your goal is to enrich data from one stream with data from another (which often involves joining on a derived key), consider if aKStream-KTablejoin is appropriate. If the "lookup" data is already in aKTable(or can be materialized into one), you can avoid repartitioning the stream that’s doing the lookup.- Diagnosis: You are performing a
joinoperation between twoKStreams where one needs to be repartitioned to match the key of the other. - Fix: Materialize one of the streams into a
KTableand use aKStream-KTablejoin.KStream<String, String> mainStream = builder.stream("main-input"); KStream<String, String> lookupStream = builder.stream("lookup-input"); // If lookupStream needs rekeying before join: KTable<String, String> lookupTable = lookupStream .selectKey((k, v) -> "lookup-key") // Repartitioning happens here for lookupTable .groupByKey() // This implicitly creates a repartition topic for lookupStream if it's a KStream .reduce((agg, val) -> val); // Materialize into a KTable mainStream .join(lookupTable, (value1, value2) -> value1 + ":" + value2, JoinWindows.of(java.time.Duration.ofDays(1))) // Example join .to("output-topic"); - Why it works: In a
KStream-KTablejoin, only theKStreamside might need repartitioning if its key doesn’t match theKTable’s key. TheKTableis already materialized and its data is available locally (or via its changelog topic), avoiding a full repartition of theKTabledata itself for every join.
- Diagnosis: You are performing a
The next concept you’ll likely encounter after optimizing repartitioning is understanding Kafka Streams’ internal state management and how it impacts fault tolerance and exactly-once processing.