Kafka Streams lets you join two streams together, but what if the keys don’t match perfectly? That’s where the foreign key join comes in, and the most surprising thing is that it’s not really a "join" in the traditional database sense at all. It’s more like a sophisticated lookup that leverages Kafka’s immutability.
Let’s see it in action. Imagine we have two Kafka topics: user-events and user-profiles.
user-events might look like this:
{"user_id": "user123", "event_type": "login", "timestamp": 1678886400}
{"user_id": "user456", "event_type": "click", "timestamp": 1678886410}
{"user_id": "user123", "event_type": "logout", "timestamp": 1678886420}
And user-profiles:
{"id": "user123", "name": "Alice", "email": "alice@example.com"}
{"id": "user789", "name": "Bob", "email": "bob@example.com"}
{"id": "user456", "name": "Charlie", "email": "charlie@example.com"}
Notice that the user_id in user-events corresponds to the id in user-profiles. We want to enrich the user-events stream with the user’s name and email.
Here’s how you’d set up a foreign key join in Kafka Streams:
// Assuming you have a KafkaStreams instance and a StreamsBuilder
// 1. Define the source streams
KStream<String, JsonNode> userEventsStream = builder.stream("user-events", Consumed.with(Serdes.String(), new JsonNodeSerde()));
KTable<String, JsonNode> userProfilesTable = builder.table("user-profiles", Consumed.with(Serdes.String(), new JsonNodeSerde()));
// 2. Prepare the streams for join
// For user-events, we need to rekey it so the join key is the user_id
KStream<String, JsonNode> rekeyedUserEvents = userEventsStream.selectKey((key, value) -> value.get("user_id").asText());
// 3. Perform the foreign key join
// The left stream (rekeyedUserEvents) emits records.
// For each record, we look up its key in the right stream (userProfilesTable).
// The joiner function is called when a match is found in userProfilesTable.
KStream<String, JsonNode> enrichedEvents = rekeyedUserEvents.join(
userProfilesTable,
(eventValue, profileValue) -> {
// This is the joiner function
// It takes the value from the left stream (event) and the right stream (profile)
// and produces the output value.
ObjectNode enrichedEvent = (ObjectNode) eventValue;
enrichedEvent.put("user_name", profileValue.get("name").asText());
enrichedEvent.put("user_email", profileValue.get("email").asText());
return enrichedEvent;
},
JoinWindows.of(java.time.Duration.ofDays(1)) // Important for outer joins, less so for inner
);
// 4. Send the result to a new topic
enrichedEvents.to("enriched-user-events", Produced.with(Serdes.String(), new JsonNodeSerde()));
The output in enriched-user-events would look like this:
{"user_id": "user123", "event_type": "login", "timestamp": 1678886400, "user_name": "Alice", "user_email": "alice@example.com"}
{"user_id": "user456", "event_type": "click", "timestamp": 1678886410, "user_name": "Charlie", "user_email": "charlie@example.com"}
{"user_id": "user123", "event_type": "logout", "timestamp": 1678886420, "user_name": "Alice", "user_email": "alice@example.com"}
The magic here is that userProfilesTable is materialized as a local state store (an immutable log of changes). When a new event arrives in rekeyedUserEvents, Kafka Streams queries this local state store for the corresponding profile. If it finds one, the joiner function is invoked to combine the data. If not, and if it were an outer join, the null value for the profile would be passed to the joiner.
What most people don’t grasp is how Kafka Streams handles out-of-order events and potential updates to the "profile" data. Because the userProfilesTable is a changelog topic, Kafka Streams can process late-arriving profile updates and correctly re-process any events that have already passed through the join, ensuring consistency. The JoinWindows parameter is primarily for outer joins and temporal semantics, defining how long to wait for a matching record before considering it a "no match." For an inner join, it’s less critical as it only emits when both sides have data.
The next step is to explore how to handle situations where the foreign key might not exist, leading to null values, and how to use outer joins effectively.