Kafka Streams can get confused when events arrive out of order, especially if you’re trying to maintain strict event time processing.
Let’s watch Kafka Streams handle late data.
Imagine you have a Kafka topic orders with events like:
{"order_id": "123", "item": "apple", "timestamp": 1678886400000}
{"order_id": "456", "item": "banana", "timestamp": 1678886460000}
{"order_id": "123", "item": "orange", "timestamp": 1678886520000}
And you want to aggregate these by order_id to count the items. If a message for order_id: 123 with an earlier timestamp arrives after the message for order_id: 456, your aggregation might not update correctly if you’re not prepared.
Here’s a simple Kafka Streams topology that aggregates order items:
StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>()))
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>()))
.aggregate(
() -> 0L, // Initializer
(key, value, aggregate) -> aggregate + 1, // Aggregator
Materialized.with(Serdes.String(), Serdes.Long())
)
.toStream()
.to("order_counts", Produced.with(Serdes.String(), Serdes.Long()));
This topology reads from orders, groups by order_id, and counts the number of items per order. If event timestamps are crucial and messages arrive late (i.e., their timestamp is older than the current stream time, which is often determined by the latest timestamp processed), Kafka Streams needs a way to handle this. By default, Kafka Streams might drop or ignore these late-arriving messages if they fall outside a defined window.
The core problem is that Kafka Streams, by default, operates on the assumption that data is generally ordered by event time. When it encounters an event whose timestamp is significantly older than the "current" time it has processed (the maximum timestamp seen so far for a given partition), it can lead to incorrect state. This is particularly problematic for aggregations and joins where the order of operations directly impacts the final result.
The solution lies in configuring Kafka Streams’ ProcessingGuarantee and specifically its handling of late-arriving records.
Here are the common causes and their fixes:
-
Default
ProcessingGuarantee(At-most-once): If your application is configured withat_least_onceorexactly_onceprocessing guarantees, Kafka Streams will try to preserve order and handle late data better. However, if you’re usingat_most_once, it might drop records that are too late.- Diagnosis: Check your
StreamsConfigforprocessing.guarantee. - Fix: Change
processing.guaranteetoat_least_onceorexactly_once.processing.guarantee=at_least_once - Why it works: These guarantees involve Kafka’s transactional capabilities or offsets that are managed more robustly, allowing Kafka Streams to buffer and re-process data more reliably, including handling out-of-order records within a certain tolerance.
- Diagnosis: Check your
-
default.deserialization.exception.handleranddefault.production.exception.handler: If your deserializer throws an exception for a late-arriving record (e.g., it expects a certain format that the late record doesn’t conform to, or it’s a duplicate and the deserializer logic fails), the record might be dropped.- Diagnosis: Inspect logs for
DeserializationExceptionHandlerorProductionExceptionHandlererrors related to your topics. - Fix: Configure a handler that can tolerate or log errors gracefully. A common approach is to use
FailCreator.INSTANCEfor production or a custom handler for deserialization that logs and skips problematic records. For late data, ensuring your deserializer is idempotent and handles potential re-processing is key. If the issue is purely about late data and not malformed data, this is less likely the direct cause, but a robust handler prevents unexpected drops.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler default.production.exception.handler=org.apache.kafka.streams.errors.FailCreator - Why it works:
LogAndContinueExceptionHandlerallows the stream processing to continue even if a specific record fails deserialization, preventing the entire application from crashing and potentially dropping the problematic record after logging it.
- Diagnosis: Inspect logs for
-
Strict Windowing without Lateness Tolerance: If you are using windowed operations (like
windowedBy(TimeWindows.of(Duration.ofMinutes(1)))) and thelate.record.handing.enableconfiguration isfalse(which is the default for some older versions or specific setups, thoughtrueis the modern default), late records that fall outside the window are dropped.- Diagnosis: If your windowed aggregations are missing data that you know should have arrived, check your windowing configuration and the
late.record.handing.enablesetting. - Fix: Explicitly enable late record handling and configure a
gracePeriodMs.
And in your topology:# In StreamsConfig late.record.handing.enable=true graceperiod.ms=60000 # 1 minute grace periodbuilder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>())) .filter((key, value) -> value.getTimestamp() != null) // Ensure timestamp exists .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>())) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) // Example window .aggregate( () -> 0L, (key, value, aggregate) -> aggregate + 1, Materialized.with(Serdes.String(), Serdes.Long()) ) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) // Crucial for late data .toStream() .to("order_counts_windowed", Produced.with(Serdes.String(), Serdes.Long())); - Why it works:
late.record.handing.enable=truetells Kafka Streams to buffer records that arrive after their window has closed but within thegraceperiod.ms. Thesuppress(Suppressed.untilWindowCloses(...))ensures that the aggregated results are only emitted after the window has closed and all potential late records within the grace period have been processed. This allows late-arriving data to be correctly incorporated into the aggregation.
- Diagnosis: If your windowed aggregations are missing data that you know should have arrived, check your windowing configuration and the
-
Misconfigured
state.diror Permissions: If Kafka Streams cannot write to its state directory, it can fail to maintain state correctly, which indirectly affects how it processes records, including late ones.- Diagnosis: Check logs for
IOExceptionorFileNotFoundExceptionrelated to thestate.dir. - Fix: Ensure the
state.diris correctly specified and the user running the Kafka Streams application has read/write permissions to that directory.state.dir=/path/to/your/kafka-streams-state - Why it works: Kafka Streams relies on local state stores to keep track of aggregations, joins, and windowed operations. If these stores are inaccessible or corrupted, the processing logic breaks down, and late data handling will fail.
- Diagnosis: Check logs for
-
Clock Skew or Incorrect System Time: If the system clocks on your Kafka brokers and your Kafka Streams application instances are not synchronized, Kafka Streams might misinterpret timestamps.
- Diagnosis: Compare timestamps of messages produced and consumed across different nodes. Check system logs for time synchronization warnings (e.g., from NTP).
- Fix: Ensure all servers running Kafka brokers and Kafka Streams applications are synchronized using a reliable Network Time Protocol (NTP) service.
# On Linux, install and configure ntpd or chrony sudo apt-get install ntp sudo systemctl start ntp - Why it works: Kafka Streams uses event timestamps for ordering and windowing. Significant clock skew can cause messages that are chronologically ordered to appear out of order to the application, leading to processing errors or dropped data.
-
Incorrect
TimestampExtractor: If you’ve implemented a customTimestampExtractorand it’s not correctly parsing or returning event timestamps, Kafka Streams will use the wrong time reference, leading to misinterpretation of lateness.- Diagnosis: Review your custom
TimestampExtractorimplementation. Log the extracted timestamps to verify they are correct. - Fix: Ensure your
TimestampExtractoraccurately extracts and returns the event timestamp from your record. If using Kafka’s defaultLogAndSkipOnInvalidTimestamporFailOnInvalidTimestamphandlers, make sure your timestamps are valid.// Example custom TimestampExtractor public class MyTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record) { if (record.value() instanceof MyEvent) { return ((MyEvent) record.value()).getEventTimestamp(); } // Handle other record types or return a default/error value return System.currentTimeMillis(); // Fallback, not ideal for event time } } // In StreamsConfig: streamsBuilder.build(new StreamsConfig(props)); streamsBuilder.setApplicationId("my-app"); streamsBuilder.setTimestampExtractor(new MyTimestampExtractor()); - Why it works: The
TimestampExtractoris the fundamental component that tells Kafka Streams what "time" it is for a given event. An incorrect extractor will misinform the stream processing engine about the event’s temporal position, causing it to treat timely events as late or vice-versa.
- Diagnosis: Review your custom
The next error you’ll likely hit after correctly handling late data is related to state store re-initialization during application restarts, particularly if you’re using exactly_once processing and need to ensure idempotency across restarts.