Kafka Streams had a fundamental mismatch between the number of partitions in your input topics and the number of partitions it expected to use internally.
This usually manifests as your Kafka Streams application not processing any data, or processing data for only a subset of your input topics, even though you can see data flowing into the topics. The core issue is that Kafka Streams, by default, expects a one-to-one partition mapping between the input topics and its internal stream tasks. If this alignment isn’t present, tasks won’t be assigned, and processing grinds to a halt.
Here are the common reasons for this partition mismatch and how to fix them:
1. Input Topics Have More Partitions Than The Application’s num.stream.threads
This is the most frequent culprit. Kafka Streams creates stream tasks based on the partitions of the input topics. The number of parallel processing threads in your application is dictated by num.stream.threads. If an input topic has, say, 10 partitions, but your application is configured with num.stream.threads=5, Kafka Streams will try to assign those 10 partitions to only 5 threads. It can’t do a clean 1:1 mapping for all partitions across all threads in a way that satisfies its internal partitioning logic, leading to unassigned partitions.
Diagnosis: Check your input topic’s partition count:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-input-topic
Note the partitions: value. Then, check your Kafka Streams application’s configuration for num.stream.threads.
Fix:
Increase num.stream.threads in your Kafka Streams application configuration to be greater than or equal to the number of partitions in your input topics. For example, if my-input-topic has 10 partitions, set:
StreamsConfig.NUM_STREAM_THREADS_CONFIG, "10"
Why it works: By increasing the number of processing threads, you provide enough "slots" for Kafka Streams to assign each input topic partition to a unique stream task, which is then processed by one of these threads.
2. Input Topics Have Fewer Partitions Than The Application’s num.stream.threads
While less common, this can also cause issues. If you have num.stream.threads=10 but your input topic only has 5 partitions, Kafka Streams will try to assign those 5 partitions to 10 threads. It will only be able to create 5 tasks, leaving 5 threads idle and potentially leading to unexpected behavior or warnings about unassigned partitions if other topics have more partitions.
Diagnosis:
Same as above, check your input topic’s partition count and num.stream.threads.
Fix:
Decrease num.stream.threads to be less than or equal to the number of partitions in your input topics. If you have multiple input topics, use the maximum partition count among all your input topics to determine the minimum num.stream.threads you should set.
StreamsConfig.NUM_STREAM_THREADS_CONFIG, "5" // If max partitions is 5
Why it works: This ensures that you don’t provision more threads than there are unique partitions to process, allowing for a more efficient and predictable assignment of tasks to threads.
3. Uneven Partition Counts Across Multiple Input Topics
If your application consumes from multiple topics (e.g., topic-a with 3 partitions and topic-b with 10 partitions), Kafka Streams will try to satisfy the partition count requirements for all input topics simultaneously. The number of stream tasks is limited by the maximum number of partitions across all input topics. If num.stream.threads is set to a value less than the maximum partition count (10 in this example), tasks for the topic with fewer partitions (3) might be processed, but the tasks for the topic with more partitions (10) will not all be assigned.
Diagnosis: List all input topics and their partition counts:
kafka-topics.sh --bootstrap-server localhost:9092 --list | xargs -I {} kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic {} | grep "partitions:"
Identify the topic with the highest number of partitions. Compare this maximum partition count to your num.stream.threads configuration.
Fix:
Set num.stream.threads to be greater than or equal to the maximum number of partitions found across all input topics.
StreamsConfig.NUM_STREAM_THREADS_CONFIG, "10" // If topic-b has 10 partitions
Why it works: Kafka Streams aims to create one task per partition for each input topic. By setting num.stream.threads to the maximum partition count, you ensure there are enough processing threads to accommodate the most partition-heavy topic, thereby enabling tasks for all topics to be assigned.
4. Rebalancing Issues and Task Assignment Delays
Sometimes, the partition matching is correct, but the application is stuck in a rebalance or hasn’t had enough time to assign tasks. This can happen on startup or after scaling up/down.
Diagnosis:
Check the Kafka Streams application logs for messages related to rebalancing, task assignment, or unassigned partitions. Look for StreamThread logs. You might see messages like "Stream thread [xxx] failed to assign tasks."
Fix:
Ensure your session.timeout.ms and max.poll.interval.ms are appropriately configured. If your processing logic for a single batch of records takes longer than max.poll.interval.ms, the consumer will be considered dead by the broker, triggering a rebalance. Also, ensure your Kafka brokers have sufficient partitions and that the group.initial.rebalance.delay.ms is not excessively high, which would delay task assignment on startup.
// In your StreamsConfig
StreamsConfig.SESSION_TIMEOUT_MS_CONFIG, "30000" // e.g., 30 seconds
StreamsConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000" // e.g., 5 minutes
Why it works: These parameters control how often the consumer heartbeats to the broker and how long it can take to process a poll. Adjusting them ensures the consumer stays "alive" during processing and that rebalances don’t unnecessarily delay task assignment.
5. Incorrect num.partitions for Output/Internal Topics
While the error is about input topics, internal state stores and output topics also have partitions. If these are created with a different number of partitions than expected by the topology, it can indirectly lead to task assignment failures, especially if your topology relies on specific co-partitioning.
Diagnosis:
If your application uses KTable or joins, check the partition counts of any internal topics created by Kafka Streams (e.g., changelog topics for state stores) and any output topics you’ve explicitly defined.
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-app-state-store-changelog
Fix:
When creating topics that Kafka Streams will interact with (either output topics or topics you explicitly manage for state stores), ensure their partition count is compatible with your stream processing topology and num.stream.threads. Often, setting output topics to have the same number of partitions as the maximum input topic partition count is a good starting point for co-partitioning.
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-output-topic --partitions 10 --replication-factor 1
Why it works: Co-partitioning is crucial for efficient joins and aggregations. If the partition counts don’t align between topics involved in these operations, Kafka Streams cannot guarantee that records with the same key will land on the same partition, preventing tasks from being assigned correctly for those operations.
6. Using KTable with Non-Co-partitioned Topics
When you use KTable operations like joins or aggregations on two topics, Kafka Streams expects these topics to be co-partitioned. Co-partitioning means they have the same number of partitions and the same partitioner (which is the default HashPartitioner if not specified otherwise). If they aren’t co-partitioned, Kafka Streams cannot guarantee that records with the same key will end up on the same partition, making the join/aggregation impossible to perform in a distributed manner. This will lead to unassigned tasks.
Diagnosis:
Verify the partition counts of all topics involved in KTable operations.
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic1
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic2
Fix:
Ensure all topics involved in KTable joins or aggregations have the same number of partitions. You may need to repartition one of the topics using a KStream.repartition() operation before the join/aggregation.
// Example of repartitioning before a join
KStream<Key, Value1> stream1 = builder.stream("topic1");
KTable<Key, Value2> table2 = builder.table("topic2"); // Assuming topic2 has different partitions
// Repartition stream1 to match table2's partitions
KStream<Key, Value1> repartitionedStream1 = stream1.repartition(
Repartitioned.<Key, Value1>as("repartitioned-topic1")
.withPartitioning(new KeyPartitioner<Key, Value1>()) // Or match topic2's partitioner
.withNumberOfPartitions(numberOfPartitionsInTopic2)
);
KTable<Key, JoinedValue> joinedTable = repartitionedStream1.join(table2, ...);
Why it works: By explicitly repartitioning one of the streams/tables, you force Kafka Streams to create a new intermediate topic with the desired number of partitions and a compatible partitioner. This ensures that records with the same key are now guaranteed to reside on the same partition across both inputs, allowing the join/aggregation task to be assigned.
After applying these fixes, the next error you’re likely to encounter is related to serialization/deserialization if your data formats don’t match between topics or if you haven’t configured the correct Serdes.