Kafka Streams repartitioning is a necessary evil when your processing logic requires changing the key of your data. The default behavior can incur significant cost.

Here’s a Kafka Streams app that demonstrates the issue and how to fix it:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class RepartitionDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "repartition-demo-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();

        // Input stream with String keys
        KStream<String, String> inputStream = builder.stream("input-topic");

        // Transformation that changes the key
        KStream<String, String> repartitionedStream = inputStream
                .mapValues(value -> value.toUpperCase()); // Value transformation

        // Now, let's imagine we need to group by a derived key.
        // This will trigger a repartition.
        KStream<String, Long> aggregatedStream = repartitionedStream
                .selectKey((key, value) -> "user-" + value.split("-")[0]); // Key transformation

        aggregatedStream
                .groupByKey() // This is where the repartitioning happens implicitly
                .count()
                .toStream()
                .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The Problem:

In the RepartitionDemo above, the selectKey operation followed by groupByKey() forces Kafka Streams to repartition the data. This means Kafka Streams must:

  1. Write the data to an intermediate topic (the repartition topic).
  2. Read the data from the repartition topic.

This repartitioning process involves serializing and deserializing data twice, writing to and reading from Kafka, and can be a significant performance bottleneck and cost driver, especially with high-volume topics. The intermediate topic can grow very large, consuming disk space and network bandwidth.

The Cost of Repartitioning:

When Kafka Streams needs to repartition, it creates a topic named applicationId-repartition-X-Y (where X and Y are internal stream task identifiers). For every record that needs repartitioning, Kafka Streams performs these steps:

  1. Serialization: The record’s new key and value are serialized.
  2. Produce: The serialized record is sent to the repartition topic.
  3. Consume: The record is read from the repartition topic.
  4. Deserialization: The record’s new key and value are deserialized.
  5. Processing: The record is then processed by the downstream operation (e.g., groupByKey, join).

This adds significant overhead.

Minimizing Repartition Cost:

The core idea is to avoid unnecessary repartitioning by performing key transformations before operations that require a specific key distribution, and to leverage Kafka Streams’ ability to handle key changes more efficiently.

Here are the common strategies to minimize repartition cost:

  1. Combine selectKey and groupBy Operations: If your goal is to group by a key that’s derived after a value transformation, perform the selectKey immediately before the groupBy or aggregate operation. Kafka Streams can often optimize this by avoiding an explicit intermediate repartition topic write/read for simple cases.

    • Diagnosis: Look for selectKey operations that are separated by other stream transformations from the subsequent groupByKey, aggregate, or join operations.
    • Fix: Rearrange your DSL operations.
      // Instead of:
      // KStream<String, String> transformed = inputStream.mapValues(v -> v.toUpperCase());
      // KStream<String, String> rekeyed = transformed.selectKey((k, v) -> "new-key");
      // rekeyed.groupByKey().count();
      
      // Do this:
      KStream<String, String> rekeyedAndGrouped = inputStream
              .selectKey((key, value) -> "new-key-derived-from-value"); // Transform key directly before grouping
      rekeyedAndGrouped.groupByKey().count();
      
    • Why it works: By chaining selectKey directly before the groupByKey, Kafka Streams can often "see" that the rekeying is directly for the grouping and can potentially optimize the internal processing, sometimes avoiding a full topic repartition if the internal partitioning scheme aligns.
  2. Use KTable.groupBy for Aggregations: When performing aggregations (like count, reduce, aggregate) on a KStream where the key needs to change, it’s often more efficient to use KTable.groupBy after the selectKey operation.

    • Diagnosis: You’re using KStream.groupByKey() on a stream that has undergone a selectKey transformation, and you’re performing an aggregation.
    • Fix: Convert to a KTable and then group.
      KStream<String, String> inputStream = builder.stream("input-topic");
      
      KStream<String, String> rekeyedStream = inputStream
              .selectKey((key, value) -> "derived-key");
      
      // Instead of: rekeyedStream.groupByKey().count()
      // Do this:
      rekeyedStream
              .groupByKey() // This still implies repartitioning if the input is a KStream
              .count() // This returns a KTable
              .toStream() // Convert back if needed
              .to("output-topic");
      
      // A more direct way for aggregations after rekeying:
      KTable<String, Long> aggregatedTable = rekeyedStream
              .groupByKey() // Kafka Streams understands this is for aggregation
              .count();
      
      aggregatedTable.toStream().to("output-topic");
      
    • Why it works: KTable operations are inherently stateful and designed for aggregations. When you groupByKey on a KStream that has just been rekeyed, Kafka Streams knows the intent is likely aggregation and can optimize. If the input to groupByKey is already a KTable, no repartitioning is needed for the groupByKey itself; the repartitioning would have happened before it became a KTable if a selectKey was involved. The key is to perform selectKey before the groupByKey() when the target is an aggregation.
  3. Explicitly Define Repartition Topics (Advanced): In rare, highly performance-sensitive scenarios, you might want to control the repartition topic name, number of partitions, and replication factor manually. This is usually done via Kafka Streams configuration.

    • Diagnosis: You have identified repartitioning as a bottleneck and want finer control over the intermediate topics.
    • Fix: Use StreamsConfig.REPARTITION_TOPIC_PREFIX_CONFIG and manage partition counts for your input topics.
      props.put(StreamsConfig.REPARTITION_TOPIC_PREFIX_CONFIG, "my-custom-repartition-");
      // You'd also need to ensure your input topics are partitioned appropriately
      // for the desired parallelism of your repartitioned stream.
      
      The number of partitions for the repartition topic is determined by the number of partitions of the source topic for that stream task. If your source topic has 10 partitions, the repartition topic will also have 10 partitions.
    • Why it works: This gives you a predictable naming scheme for debugging and allows you to pre-create these topics with specific configurations (e.g., min.insync.replicas) if needed. However, it doesn’t eliminate the repartitioning itself, just controls its manifestation.
  4. Avoid Unnecessary selectKey: The most straightforward way to avoid repartitioning cost is to avoid operations that trigger it if they aren’t strictly necessary.

    • Diagnosis: You see selectKey or operations that implicitly change the key (like flatMap returning records with new keys) followed by groupByKey, join, or aggregate.
    • Fix: Re-evaluate your processing logic. Can the downstream operation work with the existing key? Can you achieve the same result without changing the key?
      // If you were doing this:
      // inputStream
      //     .selectKey((k, v) -> extractNewKey(v)) // Repartition!
      //     .groupByKey()
      //     .count();
      
      // Consider if you can use a different approach, e.g., a Processor API
      // or a different stream-stream join pattern if that's the goal.
      
    • Why it works: No repartitioning happens if no key transformation requiring redistribution occurs.
  5. Use KStream.through() with Explicit Topic: When you need to send data to an intermediate topic and then process it, KStream.through() is the DSL method that explicitly creates a repartition topic. You can use this to control the topic name and partition count.

    • Diagnosis: You’ve identified a selectKey followed by groupByKey or similar, and you want explicit control over the intermediate topic.
    • Fix: Use through() after selectKey.
      KStream<String, String> inputStream = builder.stream("input-topic");
      
      KStream<String, String> rekeyedStream = inputStream
              .selectKey((key, value) -> "derived-key");
      
      // Instead of direct groupBy which might create a default repartition topic:
      // rekeyedStream.groupByKey().count();
      
      // Use through to control the intermediate topic:
      KStream<String, String> intermediateStream = rekeyedStream.through("my-intermediate-repartition-topic", Produced.with(Serdes.String(), Serdes.String()));
      
      // Now operate on the intermediate topic
      intermediateStream
              .groupByKey() // This will read from "my-intermediate-repartition-topic"
              .count()
              .toStream()
              .to("output-topic");
      
    • Why it works: through() is the explicit DSL method for writing to a topic and reading back from it. This ensures the data lands on a named topic, which you can then configure. The number of partitions on "my-intermediate-repartition-topic" will be determined by Kafka Streams based on the source stream’s partitions unless explicitly configured otherwise during topic creation.
  6. Leverage KStream-KTable Joins: If your goal is to enrich data from one stream with data from another (which often involves joining on a derived key), consider if a KStream-KTable join is appropriate. If the "lookup" data is already in a KTable (or can be materialized into one), you can avoid repartitioning the stream that’s doing the lookup.

    • Diagnosis: You are performing a join operation between two KStreams where one needs to be repartitioned to match the key of the other.
    • Fix: Materialize one of the streams into a KTable and use a KStream-KTable join.
      KStream<String, String> mainStream = builder.stream("main-input");
      KStream<String, String> lookupStream = builder.stream("lookup-input");
      
      // If lookupStream needs rekeying before join:
      KTable<String, String> lookupTable = lookupStream
              .selectKey((k, v) -> "lookup-key") // Repartitioning happens here for lookupTable
              .groupByKey() // This implicitly creates a repartition topic for lookupStream if it's a KStream
              .reduce((agg, val) -> val); // Materialize into a KTable
      
      mainStream
              .join(lookupTable, (value1, value2) -> value1 + ":" + value2,
                    JoinWindows.of(java.time.Duration.ofDays(1))) // Example join
              .to("output-topic");
      
    • Why it works: In a KStream-KTable join, only the KStream side might need repartitioning if its key doesn’t match the KTable’s key. The KTable is already materialized and its data is available locally (or via its changelog topic), avoiding a full repartition of the KTable data itself for every join.

The next concept you’ll likely encounter after optimizing repartitioning is understanding Kafka Streams’ internal state management and how it impacts fault tolerance and exactly-once processing.

Want structured learning?

Take the full Kafka-streams course →