Kafka Streams can get confused when events arrive out of order, especially if you’re trying to maintain strict event time processing.

Let’s watch Kafka Streams handle late data.

Imagine you have a Kafka topic orders with events like:

{"order_id": "123", "item": "apple", "timestamp": 1678886400000}
{"order_id": "456", "item": "banana", "timestamp": 1678886460000}
{"order_id": "123", "item": "orange", "timestamp": 1678886520000}

And you want to aggregate these by order_id to count the items. If a message for order_id: 123 with an earlier timestamp arrives after the message for order_id: 456, your aggregation might not update correctly if you’re not prepared.

Here’s a simple Kafka Streams topology that aggregates order items:

StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>()))
    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>()))
    .aggregate(
        () -> 0L, // Initializer
        (key, value, aggregate) -> aggregate + 1, // Aggregator
        Materialized.with(Serdes.String(), Serdes.Long())
    )
    .toStream()
    .to("order_counts", Produced.with(Serdes.String(), Serdes.Long()));

This topology reads from orders, groups by order_id, and counts the number of items per order. If event timestamps are crucial and messages arrive late (i.e., their timestamp is older than the current stream time, which is often determined by the latest timestamp processed), Kafka Streams needs a way to handle this. By default, Kafka Streams might drop or ignore these late-arriving messages if they fall outside a defined window.

The core problem is that Kafka Streams, by default, operates on the assumption that data is generally ordered by event time. When it encounters an event whose timestamp is significantly older than the "current" time it has processed (the maximum timestamp seen so far for a given partition), it can lead to incorrect state. This is particularly problematic for aggregations and joins where the order of operations directly impacts the final result.

The solution lies in configuring Kafka Streams’ ProcessingGuarantee and specifically its handling of late-arriving records.

Here are the common causes and their fixes:

  1. Default ProcessingGuarantee (At-most-once): If your application is configured with at_least_once or exactly_once processing guarantees, Kafka Streams will try to preserve order and handle late data better. However, if you’re using at_most_once, it might drop records that are too late.

    • Diagnosis: Check your StreamsConfig for processing.guarantee.
    • Fix: Change processing.guarantee to at_least_once or exactly_once.
      processing.guarantee=at_least_once
      
    • Why it works: These guarantees involve Kafka’s transactional capabilities or offsets that are managed more robustly, allowing Kafka Streams to buffer and re-process data more reliably, including handling out-of-order records within a certain tolerance.
  2. default.deserialization.exception.handler and default.production.exception.handler: If your deserializer throws an exception for a late-arriving record (e.g., it expects a certain format that the late record doesn’t conform to, or it’s a duplicate and the deserializer logic fails), the record might be dropped.

    • Diagnosis: Inspect logs for DeserializationExceptionHandler or ProductionExceptionHandler errors related to your topics.
    • Fix: Configure a handler that can tolerate or log errors gracefully. A common approach is to use FailCreator.INSTANCE for production or a custom handler for deserialization that logs and skips problematic records. For late data, ensuring your deserializer is idempotent and handles potential re-processing is key. If the issue is purely about late data and not malformed data, this is less likely the direct cause, but a robust handler prevents unexpected drops.
      default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
      default.production.exception.handler=org.apache.kafka.streams.errors.FailCreator
      
    • Why it works: LogAndContinueExceptionHandler allows the stream processing to continue even if a specific record fails deserialization, preventing the entire application from crashing and potentially dropping the problematic record after logging it.
  3. Strict Windowing without Lateness Tolerance: If you are using windowed operations (like windowedBy(TimeWindows.of(Duration.ofMinutes(1)))) and the late.record.handing.enable configuration is false (which is the default for some older versions or specific setups, though true is the modern default), late records that fall outside the window are dropped.

    • Diagnosis: If your windowed aggregations are missing data that you know should have arrived, check your windowing configuration and the late.record.handing.enable setting.
    • Fix: Explicitly enable late record handling and configure a gracePeriodMs.
      # In StreamsConfig
      late.record.handing.enable=true
      graceperiod.ms=60000 # 1 minute grace period
      
      And in your topology:
      builder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>()))
          .filter((key, value) -> value.getTimestamp() != null) // Ensure timestamp exists
          .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>()))
          .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) // Example window
          .aggregate(
              () -> 0L,
              (key, value, aggregate) -> aggregate + 1,
              Materialized.with(Serdes.String(), Serdes.Long())
          )
          .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) // Crucial for late data
          .toStream()
          .to("order_counts_windowed", Produced.with(Serdes.String(), Serdes.Long()));
      
    • Why it works: late.record.handing.enable=true tells Kafka Streams to buffer records that arrive after their window has closed but within the graceperiod.ms. The suppress(Suppressed.untilWindowCloses(...)) ensures that the aggregated results are only emitted after the window has closed and all potential late records within the grace period have been processed. This allows late-arriving data to be correctly incorporated into the aggregation.
  4. Misconfigured state.dir or Permissions: If Kafka Streams cannot write to its state directory, it can fail to maintain state correctly, which indirectly affects how it processes records, including late ones.

    • Diagnosis: Check logs for IOException or FileNotFoundException related to the state.dir.
    • Fix: Ensure the state.dir is correctly specified and the user running the Kafka Streams application has read/write permissions to that directory.
      state.dir=/path/to/your/kafka-streams-state
      
    • Why it works: Kafka Streams relies on local state stores to keep track of aggregations, joins, and windowed operations. If these stores are inaccessible or corrupted, the processing logic breaks down, and late data handling will fail.
  5. Clock Skew or Incorrect System Time: If the system clocks on your Kafka brokers and your Kafka Streams application instances are not synchronized, Kafka Streams might misinterpret timestamps.

    • Diagnosis: Compare timestamps of messages produced and consumed across different nodes. Check system logs for time synchronization warnings (e.g., from NTP).
    • Fix: Ensure all servers running Kafka brokers and Kafka Streams applications are synchronized using a reliable Network Time Protocol (NTP) service.
      # On Linux, install and configure ntpd or chrony
      sudo apt-get install ntp
      sudo systemctl start ntp
      
    • Why it works: Kafka Streams uses event timestamps for ordering and windowing. Significant clock skew can cause messages that are chronologically ordered to appear out of order to the application, leading to processing errors or dropped data.
  6. Incorrect TimestampExtractor: If you’ve implemented a custom TimestampExtractor and it’s not correctly parsing or returning event timestamps, Kafka Streams will use the wrong time reference, leading to misinterpretation of lateness.

    • Diagnosis: Review your custom TimestampExtractor implementation. Log the extracted timestamps to verify they are correct.
    • Fix: Ensure your TimestampExtractor accurately extracts and returns the event timestamp from your record. If using Kafka’s default LogAndSkipOnInvalidTimestamp or FailOnInvalidTimestamp handlers, make sure your timestamps are valid.
      // Example custom TimestampExtractor
      public class MyTimestampExtractor implements TimestampExtractor {
          @Override
          public long extract(ConsumerRecord<Object, Object> record) {
              if (record.value() instanceof MyEvent) {
                  return ((MyEvent) record.value()).getEventTimestamp();
              }
              // Handle other record types or return a default/error value
              return System.currentTimeMillis(); // Fallback, not ideal for event time
          }
      }
      // In StreamsConfig:
      streamsBuilder.build(new StreamsConfig(props));
      streamsBuilder.setApplicationId("my-app");
      streamsBuilder.setTimestampExtractor(new MyTimestampExtractor());
      
    • Why it works: The TimestampExtractor is the fundamental component that tells Kafka Streams what "time" it is for a given event. An incorrect extractor will misinform the stream processing engine about the event’s temporal position, causing it to treat timely events as late or vice-versa.

The next error you’ll likely hit after correctly handling late data is related to state store re-initialization during application restarts, particularly if you’re using exactly_once processing and need to ensure idempotency across restarts.

Want structured learning?

Take the full Kafka-streams course →