Kafka Streams doesn’t actually have a distinct "outer join." What you’re likely thinking of is the outerJoin operation, which behaves like a traditional outer join in relational databases.
Let’s see it in action. Imagine we have two Kafka topics: users and orders.
users topic (key: user_id, value: user_name)
1 -> Alice
2 -> Bob
3 -> Charlie
orders topic (key: user_id, value: order_details)
1 -> {item: "Laptop", price: 1200}
2 -> {item: "Keyboard", price: 75}
4 -> {item: "Mouse", price: 25}
We want to join users and orders on user_id to get a stream of user activity with their names.
Here’s how you’d set up an outerJoin in Kafka Streams:
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, String> userStream = builder.stream("users");
KStream<Long, Order> orderStream = builder.stream("orders", Consumed.with(Serdes.Long(), new OrderSerde())); // Assuming OrderSerde for Order objects
KStream<Long, String> joinedStream = userStream.outerJoin(
orderStream,
(userValue, orderValue) -> {
if (userValue != null && orderValue != null) {
return userValue + " ordered " + orderValue.getItem(); // Both present
} else if (userValue != null) {
return userValue + " has no orders"; // Only user present
} else if (orderValue != null) {
return "Unknown user ordered " + orderValue.getItem(); // Only order present (user_id 4)
} else {
return "This case should not happen in outer join";
}
},
JoinWindows.of(Duration.ofMinutes(5)) // Window for join
);
joinedStream.to("user_orders_output");
The outerJoin operation, when applied to userStream and orderStream, will produce records for:
- User 1:
Alice ordered Laptop(both user and order exist) - User 2:
Bob ordered Keyboard(both user and order exist) - User 3:
Alice has no orders(user exists, but no matching order within the window) - Unknown user ordered Mouse (order exists for user ID 4, but no user with ID 4 in the
usersstream)
This outerJoin is incredibly useful for scenarios where you need to see all events from one stream, even if there’s no corresponding event in the other. It’s not about merging data where both sides have a match; it’s about ensuring that no record from either input stream is lost if it doesn’t have a match. The JoinWindows parameter is crucial here; it defines the time duration within which records from both streams must arrive to be considered a match. If an order for "Alice" arrives 10 minutes after her user record, and the window is 5 minutes, she will appear as "Alice has no orders" in the output, even though a user record existed.
The key difference from a leftJoin (which Kafka Streams does explicitly provide) is how unmatched records from the second stream are handled. In a leftJoin (e.g., userStream.leftJoin(orderStream, ...)), records from orderStream that don’t have a match in userStream would be dropped. The outerJoin ensures that even these unmatched records from the "right" side (the orderStream in this example) get processed, yielding the "Unknown user ordered Mouse" output.
The lambda function you provide to outerJoin receives null for the value of a stream if no matching record is found on that side within the specified window. This is how you differentiate between a record present in both streams, only in the left stream, or only in the right stream. This allows you to construct the output value precisely based on which input streams contributed data.
The mechanism relies on Kafka Streams buffering records from both input streams within the defined JoinWindows. When a record arrives on one stream, it checks its internal buffer for matching records (based on the join key) from the other stream that fall within the window. If a match is found, the join is performed. If not, it waits until the window closes or a record from the other stream arrives. For unmatched records on either side when the window closes, the join function is still invoked, but with null for the missing stream’s value.
What most people miss is that the JoinWindows isn’t just for finding matches; it’s also for determining when to emit records for unmatched keys. When the window for a record on either stream expires without a match from the other stream, that unmatched record is then processed by the join function with a null value for the missing side, ensuring it appears in the output.
The next concept you’ll likely encounter is handling different join types like leftJoin and innerJoin and understanding their specific behaviors regarding unmatched records.