Kafka Streams’ exactly-once processing guarantees that each record is processed by your application exactly one time, even in the face of failures.
Let’s see it in action. Imagine we have a Kafka topic orders containing incoming customer orders, and we want to process these orders to update inventory and send out notifications. Without exactly-once, a failure during processing might lead to duplicate updates or missed notifications.
Here’s a simplified Kafka Streams topology that aims for exactly-once processing:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // This is the key!
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); // For state stores
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
The PROCESSING_GUARANTEE_CONFIG set to EXACTLY_ONCE_V2 is the magic here. This tells Kafka Streams to leverage Kafka’s transactional capabilities.
Internally, Kafka Streams achieves exactly-once by coordinating state changes and output writes within Kafka transactions. When a record is processed, Kafka Streams:
- Reads the record from an input topic.
- Updates its internal state stores (e.g., for aggregations or joins).
- Writes any output records to one or more output topics.
- Commits the Kafka transaction, ensuring that both the state store updates and the output writes are atomic. If any step fails, the entire transaction is aborted, and no changes are visible.
Consider this code snippet for processing an order and updating inventory:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
orders.peek((key, order) -> {
// Update inventory state store
inventoryStore.put(order.getProductId(), inventoryStore.get(order.getProductId()) - order.getQuantity());
// Send notification (this would typically be an output topic)
notificationProducer.send(new ProducerRecord<>("notifications", order.getCustomerId(), "Order " + order.getOrderId() + " processed."));
});
The peek operator, while useful for side effects, doesn’t inherently guarantee exactly-once for external systems. For true transactional output, you’d typically use toStream() and then to() with a transactional producer.
The real power comes from Kafka’s transactional API. When EXACTLY_ONCE_V2 is enabled, Kafka Streams acts as a transactional producer. It starts a Kafka transaction before processing a batch of records, writes to its internal state topics (which are also Kafka topics), and writes to your output topics. Finally, it commits the transaction. If the application crashes mid-way, the transaction is automatically aborted by the Kafka broker.
The state.dir configuration is crucial. It points to a directory on the local filesystem where Kafka Streams stores its changelog topics and repartition topics for stateful operations. These are backed by Kafka topics themselves, and their writes are also part of the transaction.
A common misconception is that EXACTLY_ONCE applies to external systems you might interact with outside of Kafka topics (like a relational database). Kafka Streams’ exactly-once guarantee is primarily for data flowing through Kafka topics and updates to its internal state stores. For external systems, you’d need to integrate them into the Kafka transaction or implement your own idempotent writes and recovery mechanisms.
This transactional processing ensures that even if your Kafka Streams application restarts or crashes, it can resume from the last committed transaction, preventing duplicate processing of records.
The next hurdle you’ll likely encounter is understanding how to handle idempotent writes to external databases within a transactional Kafka Streams application.