The TopologyTestDriver is the unsung hero of Kafka Streams unit testing, allowing you to spin up and test your entire stream processing pipeline without needing a live Kafka cluster. It simulates the behavior of Kafka brokers, producers, and consumers, letting you feed it input records and assert on the output records your topology generates.
Let’s see it in action. Imagine a simple Kafka Streams application that takes a stream of user events, filters out events that aren’t "purchases," and then aggregates the total purchase amount per user.
Here’s a snippet of the Kafka Streams Topology:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> userEvents = builder.stream("user-events", Consumed.with(Serdes.String(), new JsonNodeSerde()));
KStream<String, JsonNode> purchases = userEvents.filter((key, value) ->
"purchase".equals(value.get("eventType").asText())
);
KTable<String, Integer> purchaseAmounts = purchases.groupByKey()
.aggregate(
() -> 0, // Initializer
(key, value, aggregate) -> aggregate + value.get("amount").asInt(), // Aggregator
(key, newValue, aggregate) -> aggregate, // Reducer for KTable-KTable joins
Materialized.with(Serdes.String(), Serdes.Integer())
);
purchaseAmounts.toStream().to("purchase-amounts", Produced.with(Serdes.String(), Serdes.Integer()));
Topology topology = builder.build();
And here’s how you’d unit test it using TopologyTestDriver:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.Properties;
public class TopologyTest {
@Test
public void testPurchaseAggregation() throws IOException {
Topology topology = buildTopology(); // Assume buildTopology() returns the topology from above
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); // Required, but not used
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonNodeSerde.class.getName()); // Assuming you have a JsonNodeSerde
try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
ConsumerRecordFactory<String, JsonNode> recordFactory =
new ConsumerRecordFactory<>(new StringSerializer(), new JsonNodeSerializer()); // Assuming you have JsonNodeSerializer
ObjectMapper mapper = new ObjectMapper();
// Send initial purchase events
ObjectNode purchase1 = mapper.createObjectNode();
purchase1.put("userId", "user1");
purchase1.put("eventType", "purchase");
purchase1.put("amount", 100);
driver.pipeInput(recordFactory.create("user-events", "user1", purchase1));
ObjectNode purchase2 = mapper.createObjectNode();
purchase2.put("userId", "user2");
purchase2.put("eventType", "purchase");
purchase2.put("amount", 200);
driver.pipeInput(recordFactory.create("user-events", "user2", purchase2));
ObjectNode purchase3 = mapper.createObjectNode();
purchase3.put("userId", "user1");
purchase3.put("eventType", "purchase");
purchase3.put("amount", 50);
driver.pipeInput(recordFactory.create("user-events", "user1", purchase3));
// Send a non-purchase event
ObjectNode click = mapper.createObjectNode();
click.put("userId", "user3");
click.put("eventType", "click");
click.put("page", "/home");
driver.pipeInput(recordFactory.create("user-events", "user3", click));
// Verify output
// Expected: user1: 150, user2: 200
// The TopologyTestDriver processes records in batches. We need to advance time to ensure
// aggregations are finalized and emitted.
driver.advanceStreamTime(Duration.ofMillis(100)); // Advance time by some arbitrary amount
// Check the output topic "purchase-amounts"
// For KTable, the output is usually a series of updates.
// We expect two final records for user1 and user2.
// The exact number of output records depends on how Kafka Streams handles updates for KTables.
// For this simple example, let's check for the final state.
// Fetch records from the output topic
List<ConsumerRecord<String, Integer>> outputRecords = driver.readOutput("purchase-amounts", new StringDeserializer(), new IntegerDeserializer());
// Due to KTable's update mechanism, you might see multiple records for a key.
// A common strategy is to find the latest value for each key.
Map<String, Integer> finalState = outputRecords.stream()
.collect(Collectors.toMap(
ConsumerRecord::key,
ConsumerRecord::value,
(existing, replacement) -> replacement // Keep the latest value
));
assertThat(finalState.get("user1"), is(150));
assertThat(finalState.get("user2"), is(200));
assertThat(finalState.size(), is(2)); // Ensure only processed users are in the output
}
}
// Helper method to build the topology (replace with your actual topology building logic)
private Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// ... (your topology definition here) ...
return builder.build();
}
}
The TopologyTestDriver simulates the Kafka environment. You feed it records into input topics using driver.pipeInput(), and then you can read the results from output topics using driver.readOutput(). The ConsumerRecordFactory helps you construct input records with the correct serializers and deserializers. OutputVerifier (though not explicitly used in this snippet, it’s a common companion) can help assert correctness. Crucially, driver.advanceStreamTime() is often needed to trigger processing and emission of results, especially for stateful operations like aggregations.
The mental model for TopologyTestDriver is this: it’s a single-threaded, in-memory simulation of your Kafka Streams application. It processes records one by one (or in small batches depending on internal buffering) and updates its internal state stores. You control the flow of time and input, and then you inspect the state stores or output topics to verify behavior.
The most surprising true thing about TopologyTestDriver is that it doesn’t actually use Kafka’s network layer or persistence. It’s entirely in-memory. This means your tests are blazing fast and completely isolated, but it also means you can’t test scenarios that rely on network partitions, broker failures, or actual disk I/O without additional mocking or higher-level integration tests.
One thing that often trips people up is how TopologyTestDriver handles time, especially with stateful operations. By default, the driver’s internal clock starts at 0. When you pipeInput, the timestamp of the input record advances the driver’s clock if the record’s timestamp is later than the current clock. For aggregations and windowed operations, you must explicitly advance the stream time using driver.advanceStreamTime(Duration.ofMillis(X)) to simulate the passage of time and trigger the emission of results from windows or state stores. Without this, your test might run, but no output will appear because the "time" hasn’t advanced enough for the operations to complete.
After successfully testing your core topology, the next logical step is to consider how to handle more complex scenarios like error handling within your processing logic, or how to test interactions with external systems like databases or other microservices.