Kafka Streams can process real-time data, but its true power lies in how it models stateful computations, transforming simple stream processing into complex event processing for applications like fraud detection.
Let’s watch a simplified fraud detection scenario unfold. Imagine we have two Kafka topics: transactions and account_balances.
transactions topic:
{"account_id": "acc123", "amount": 100.00, "timestamp": 1678886400}
{"account_id": "acc456", "amount": 50.00, "timestamp": 1678886405}
{"account_id": "acc123", "amount": 200.00, "timestamp": 1678886410}
{"account_id": "acc123", "amount": 150.00, "timestamp": 1678886415}
account_balances topic:
{"account_id": "acc123", "balance": 1000.00}
{"account_id": "acc456", "balance": 500.00}
Our Kafka Streams application will:
- Read transactions.
- Join each transaction with the current account balance for that account.
- Aggregate transactions per account over a 5-minute window.
- Flag accounts that have a total transaction amount exceeding 500 in that window.
Here’s a snippet of the Kafka Streams DSL code:
StreamsBuilder builder = new StreamsBuilder();
// 1. Read transactions
KStream<String, Transaction> transactionStream = builder.stream(
"transactions",
Consumed.with(Serdes.String(), new JsonSerde<>(Transaction.class))
);
// 2. Get account balances (as a global KTable for efficient lookups)
GlobalKTable<String, AccountBalance> balanceTable = builder.globalTable(
"account_balances",
Consumed.with(Serdes.String(), new JsonSerde<>(AccountBalance.class))
);
// Join transaction with its balance
KStream<String, EnrichedTransaction> enrichedStream = transactionStream.join(
balanceTable,
(transaction, balance) -> new EnrichedTransaction(transaction, balance),
JoinWindows.of(Duration.ofMinutes(5)), // Window for join, not aggregation
Serdes.String(), Serdes.String(), // Key serdes
new EnrichedTransactionSerde() // Value serdes
);
// 3. Aggregate transactions per account over a 5-minute window
KTable<String, Double> transactionCounts = enrichedStream
.map((key, enrichedTx) -> new KeyValue<>(enrichedTx.getTransaction().getAccountId(), enrichedTx.getTransaction().getAmount()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> 0.0, // Initial value
(key, amount, currentSum) -> currentSum + amount, // Aggregator
Materialized.with(Serdes.String(), Serdes.Double())
);
// 4. Flag suspicious transactions
transactionCounts
.toStream()
.filter((windowedKey, totalAmount) -> totalAmount > 500.00)
.map((windowedKey, totalAmount) -> new KeyValue<>(windowedKey.key(), totalAmount)) // Unwrap windowed key
.to("suspicious_transactions", Produced.with(Serdes.String(), Serdes.Double()));
This code defines a computation graph. Kafka Streams materializes this graph into interactive queries and state stores. When a Transaction arrives, it’s first looked up against the balanceTable (a global KTable, which is essentially a changelog of account balances replicated to every Kafka Streams instance). Then, the transaction’s amount is added to a running sum for that account, partitioned by account_id and windowed by time. If the sum within a 5-minute window exceeds 500, it’s sent to the suspicious_transactions topic.
The core problem this solves is stateful stream processing. Unlike stateless operations (like filtering or mapping individual records), this requires maintaining and updating state (the running sum of transactions per account per window). Kafka Streams handles this by creating local, persistent state stores (RocksDB by default) on each application instance. These stores are backed by Kafka changelog topics, ensuring fault tolerance and exactly-once processing guarantees.
The windowedBy(TimeWindows.of(Duration.ofMinutes(5))) is the critical piece for temporal analysis. It groups events not just by key but also by time. For each unique account_id, Kafka Streams maintains a separate state for each overlapping 5-minute window. When a new transaction arrives, it’s placed into the window(s) it falls into.
The most surprising thing most people don’t realize is how Kafka Streams handles the end of a window. When the time for a window to close arrives, Kafka Streams automatically materializes the final aggregated result for that window. It then removes the state associated with that now-closed window from the local state store. This is crucial for preventing unbounded state growth in long-running applications. The changelog topic, however, retains the history of these aggregations, allowing for re-processing or rebuilding state if needed.
The next concept you’ll likely grapple with is handling late-arriving data and its impact on windowed aggregations.