Kafka Streams’ default partitioner might be sending your data to the wrong Kafka partitions, leading to uneven processing or even deadlock scenarios.

Here’s how you can take granular control of data routing in Kafka Streams using a custom partitioner.

Let’s imagine we have a Kafka topic user-events that we want to process using Kafka Streams. Each event has a userId and an eventType. We want to ensure all events for a specific userId land on the same Kafka partition so that our stream processing logic (e.g., aggregating user activity) can operate on a single thread for that user.

Here’s a simplified UserEvent case class:

case class UserEvent(userId: String, eventType: String, timestamp: Long)

And here’s how we’d set up a Kafka Streams application without a custom partitioner:

import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{KStream, KTable}
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import java.util.Properties

val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-events-processor")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[String]Serde.getClass.getName)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[String]Serde.getClass.getName)

val builder = new StreamsBuilder()
val userEventsStream: KStream[String, UserEvent] = builder.stream[String, UserEvent]("user-events")

// Assume UserEvent is serialized/deserialized correctly
// For simplicity, we're not showing the Serdes for UserEvent here,
// but in a real app, you'd have a UserEventSerde.

// The problem: If we don't specify a partitioner, Kafka Streams
// will use a default partitioner. For `KStream.to`, it defaults to
// a `DefaultPartitioner` which often uses the key's hash.
// If our key is null or not consistently the userId, this won't work.
// If we explicitly set the key to `userId` here, it would use the default partitioner on `userId`.

// Let's say we want to repartition based on userId to a new topic `user-activity-agg`
// The default partitioning might spread `userId` "user123" across multiple partitions
// of `user-activity-agg`, defeating our goal.

// userEventsStream.to("user-activity-agg") // This uses default partitioning

// To fix this, we need a custom partitioner.

The core problem is that Kafka Streams, by default, uses a DefaultPartitioner when you write data to a topic (e.g., using KStream.to or when repartitioning internally). This partitioner typically hashes the record’s key to determine the partition. If your key isn’t consistently the userId or if you need more sophisticated routing logic, the default behavior won’t suffice. You might end up with data for the same userId spread across different partitions, breaking your stateful processing assumptions or causing load imbalance.

The solution is to implement org.apache.kafka.streams.processor.Partitioner. This interface has a single method: partition(topic: String, key: K, value: V, numPartitions: Int): Int.

Here’s a custom partitioner that routes based on userId:

import org.apache.kafka.streams.processor.Partitioner
import org.apache.kafka.streams.processor.StreamPartitioner

class UserEventPartitioner extends StreamPartitioner[String, UserEvent] {
  override def partition(topic: String, key: String, value: UserEvent, numPartitions: Int): Int = {
    // We want to partition based on userId, which is inside the UserEvent value
    val userId = value.userId
    // Use the userId's hashcode, modulo the number of partitions
    // This ensures all events for the same userId go to the same partition
    Math.abs(userId.hashCode) % numPartitions
  }
}

Now, we need to tell Kafka Streams to use this custom partitioner. You do this when you call to or when you are performing a repartitioning operation like groupByKey.

Let’s modify the stream processing to use our custom partitioner when writing to a new topic:

import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{KStream, KTable}
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import java.util.Properties
import org.apache.kafka.streams.processor.StreamPartitioner // Import this

// ... (props, builder, userEventsStream as defined before) ...

// Assume UserEvent is correctly serialized and deserialized.
// For this example, let's say our key is null or irrelevant for partitioning,
// and the routing logic is entirely within the UserEvent value.
// We'll explicitly set the key to null for the `to` operation,
// forcing it to rely on the value for partitioning.
val nullKeyStream = userEventsStream.selectKey((_, event) => null.asInstanceOf[String])

// Now, use the custom partitioner when writing to the output topic
nullKeyStream.to("user-activity-agg", new UserEventPartitioner())

val streams = new KafkaStreams(builder.build(), props)
streams.start()

// To shutdown gracefully:
// Runtime.getRuntime.addShutdownHook(new Thread(() => streams.close()))

In this corrected example:

  1. We use selectKey to explicitly set the key to null. This is crucial because if a non-null key were present, Kafka Streams might still attempt to use the default partitioner on that key if the Partitioner implementation itself doesn’t handle null keys gracefully or if the Partitioner is only applied to the key. By setting the key to null, we ensure that the value (our UserEvent) is the sole determinant for partitioning, and our UserEventPartitioner will be invoked correctly.
  2. We then call nullKeyStream.to("user-activity-agg", new UserEventPartitioner()). The second argument is an instance of our custom StreamPartitioner. Kafka Streams will now use UserEventPartitioner to decide which partition in user-activity-agg each UserEvent record should be written to.

How it works mechanically: The UserEventPartitioner’s partition method receives the UserEvent object. It extracts the userId from the value. It then computes the hash code of the userId, takes its absolute value (to handle negative hash codes), and uses the modulo operator (%) with numPartitions (the total number of partitions available in the target topic user-activity-agg). This calculation guarantees that any two UserEvent objects with the same userId will always produce the same partition index, provided the number of partitions in the topic remains constant.

When to use a custom partitioner:

  • Stateful Processing: When you need to ensure all records belonging to a specific logical entity (like a user, session, or device) land on the same partition for stateful operations (aggregations, joins, windowing).
  • Load Balancing: To distribute data more evenly across partitions based on a meaningful attribute rather than just a random hash of a potentially unbalanced key.
  • Specific Routing Logic: When your partitioning needs are more complex than a simple hash, e.g., routing based on geographic region, event type, or a combination of fields.

The one thing most people don’t know: When you use KStream.groupByKey or KTable.groupBy and then perform an aggregation or join, Kafka Streams internally performs a repartitioning step if the current key isn’t suitable for the desired grouping. This repartitioning also uses a partitioner. If you need to control how this internal repartitioning happens, you can provide a custom StreamPartitioner when calling groupByKey or groupBy. For example: myKStream.groupByKey(Serialized.with(stringSerde, userEventSerde), Grouped.with(new UserEventPartitioner())). The Grouped.with part is where you’d specify your custom partitioner to influence the internal repartitioning stage.

The next error you’ll hit is likely related to serialization/deserialization issues if your UserEventSerde isn’t correctly implemented, or a TaskCorruptedException if stream tasks fail to initialize due to configuration mismatches after the routing is fixed.

Want structured learning?

Take the full Kafka-streams course →