Kafka StreamsTimestampExtractor is how the framework determines the "event time" of a Kafka message. This isn’t just a detail; it’s the fundamental mechanism that allows Kafka Streams to process events in the order they actually occurred, rather than the order they arrived at the broker, which is crucial for accurate aggregations and windowing.

Let’s see it in action. Imagine we have a Kafka topic sensor-readings with messages like this:

{
  "sensor_id": "temp-001",
  "value": 22.5,
  "timestamp": 1678886400000
}

Here, timestamp is a Unix epoch milliseconds value. By default, Kafka Streams might use the record’s write timestamp (when it was written to Kafka), but that’s often not what we want for true event-time processing. We want to use the timestamp field within our message.

To achieve this, we need to implement the TimestampExtractor interface. Here’s a custom implementation:

import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomTimestampExtractor implements TimestampExtractor {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public long extract(org.apache.kafka.streams.TopicPartition tp, Headers headers, long recordTimestamp) {
        // Try to extract from record headers first (e.g., if set by a previous stage)
        // For this example, we'll focus on the record value.

        // If value is null, we can't extract, so fall back to recordTimestamp
        // (which is the broker's write timestamp by default)
        if (tp.value() == null) {
            return recordTimestamp;
        }

        try {
            // Assuming the record value is JSON and contains a "timestamp" field
            // In a real scenario, you'd deserialize the actual record value
            // For simplicity here, we'll simulate extracting from a String representation
            String jsonString = new String(tp.value()); // This is a simplification!
            // In reality, you'd need to access the actual record value via ProcessorContext
            // For example, if using KStream<Key, Value>, you'd pass the Value type.
            // This example is conceptual and would need actual record deserialization logic.

            // Example: If using JSON and Jackson
            // JsonNode rootNode = objectMapper.readTree(jsonString);
            // JsonNode timestampNode = rootNode.get("timestamp");
            // if (timestampNode != null && timestampNode.isNumber()) {
            //     return timestampNode.asLong();
            // }

            // For demonstration purposes, let's assume a simple string "timestamp:1678886400000"
            // or a direct JSON string as shown above.
            // A robust implementation would involve deserializing the actual record value.

            // Let's assume we have a POJO `SensorReading` with a `getTimestamp()` method.
            // If your stream is `KStream<String, SensorReading>`, you'd do:
            // SensorReading reading = (SensorReading) tp.value(); // Again, simplification
            // return reading.getTimestamp();

            // For the JSON example:
            // ObjectMapper mapper = new ObjectMapper();
            // SensorReading reading = mapper.readValue(tp.value(), SensorReading.class);
            // return reading.getTimestamp();

            // For this specific JSON structure:
            // This requires actual deserialization of the record value.
            // Let's simulate by parsing the JSON string directly for demonstration.
            // This part is crucial and depends heavily on your Serdes.
            // If your record value is a byte array representing JSON:
            String recordValueAsString = new String((byte[]) tp.value()); // THIS IS WRONG. You need the actual value deserialized by your Serdes.
            // Correct approach: The `extract` method doesn't get the deserialized value directly.
            // You usually configure the TimestampExtractor when building the Streams app.
            // The `extract` method is called by Kafka Streams internally.
            // The `recordTimestamp` parameter IS the timestamp extracted by the default extractor
            // or from the record itself if available.

            // Let's re-think the `extract` method signature and how it's used.
            // The `extract` method is called for each record.
            // The `recordTimestamp` parameter is the timestamp *associated with the record*.
            // If the record has a timestamp field in its metadata (e.g., from Kafka), this is it.
            // If you want to extract from the *message payload*, you need to know the payload's format.

            // The MOST COMMON way to do this is when configuring the `StreamsBuilder`:
            // StreamsBuilder builder = new StreamsBuilder();
            // builder.stream(Serdes.String(), Serdes.String(), "input-topic"); // Example
            // KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
            // streams.setTimestampExtractorFactory(() -> new CustomTimestampExtractor()); // THIS IS WRONG.
            // You set it on the Topology itself.

            // Correct configuration:
            // StreamsBuilder builder = new StreamsBuilder();
            // Properties props = new Properties();
            // // ... other configs
            // props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
            // KafkaStreams streams = new KafkaStreams(builder.build(), props);

            // Now, inside `extract`, we assume `tp.value()` IS the deserialized record value.
            // This implies your Serdes are configured correctly.
            // If your Serdes output a byte array, you'd need to deserialize here.
            // If your Serdes output a POJO, you'd cast.

            // Let's assume your Serdes deserialize to a generic `Object` or `byte[]` and you need to parse.
            // If your Serdes deserialize to `byte[]`:
            try {
                String json = new String((byte[]) tp.value()); // Again, this assumes value is byte[] and JSON
                // This is where you'd parse the JSON.
                // For simplicity, let's assume the value IS the timestamp as a string.
                // THIS IS HIGHLY UNLIKELY IN REALITY.
                // The common case is JSON or Avro.

                // A more realistic scenario with JSON payload:
                // Assume `tp.value()` is `byte[]` which is a JSON string.
                // You'd need a JSON parser.
                // For example, with Jackson:
                // ObjectMapper mapper = new ObjectMapper();
                // Map<String, Object> map = mapper.readValue(json, Map.class);
                // Object tsObj = map.get("timestamp");
                // if (tsObj instanceof Long) {
                //     return (Long) tsObj;
                // } else if (tsObj instanceof Integer) {
                //     return ((Integer) tsObj).longValue();
                // }

                // Let's refine: The `extract` method receives the *raw* value if using default Serdes.
                // If you configure `Serdes.String()` for value, `tp.value()` will be a String.
                // If you configure `Serdes.ByteArray()`, `tp.value()` will be byte[].
                // If you configure a custom Serde that returns a POJO, `tp.value()` will be that POJO.

                // Let's assume your `Serdes.String()` is used, and the value is a JSON string.
                String jsonPayload = new String((byte[]) tp.value()); // If your value Serde is ByteArraySerde
                // Or if value Serde is StringSerde: String jsonPayload = (String) tp.value();

                // Assuming JSON payload with Jackson
                ObjectMapper mapper = new ObjectMapper();
                // This requires a specific POJO or a generic Map.
                // Let's assume a POJO `SensorReading` with a `long timestamp` field.
                // If your value Serde is `JsonSerde<SensorReading>`, you'd cast.
                // If it's `ByteArraySerde` or `StringSerde`, you parse.

                // For this example, let's assume value is `byte[]` and JSON.
                // This is a common pattern if you're using Avro or Protobuf with a wrapper.
                // Or if you're manually handling JSON bytes.
                byte[] valueBytes = (byte[]) tp.value(); // Assuming this is what you get.
                String json = new String(valueBytes, java.nio.charset.StandardCharsets.UTF_8);
                // Now parse this JSON.
                // Let's assume the JSON is simple and can be parsed to extract 'timestamp'.
                // This is simplified for clarity. Real-world parsing needs error handling and proper deserialization.

                // Simulate parsing a JSON string like `{"sensor_id":"temp-001", "value":22.5, "timestamp":1678886400000}`
                // This is still tricky because the `extract` method doesn't know the *type* of `tp.value()` directly
                // without knowing your Serdes configuration.

                // The most robust way: Configure your Serdes first to deserialize into a known type.
                // For instance, if you use `JsonSerde<SensorReading>` for your value Serde,
                // then `tp.value()` would be `SensorReading`.

                // Let's assume `tp.value()` is a `SensorReading` object.
                // If your Serdes are configured to deserialize to a `SensorReading` POJO:
                // SensorReading reading = (SensorReading) tp.value(); // This cast happens IF your Serde returns Object.
                // If your Serde is specific, you might cast to that type.

                // Let's assume your value Serde is `ByteArraySerde` and you're manually parsing.
                // This is common if you're using Avro, Protobuf, or custom binary formats.
                byte[] data = (byte[]) tp.value();
                // Example: Assume data is JSON bytes.
                // This parsing logic depends on your data format.
                // For JSON:
                String jsonString = new String(data, java.nio.charset.StandardCharsets.UTF_8);
                // Find the "timestamp" field. This requires a JSON parser.
                // A simple regex or string split is fragile. Use a proper parser.
                int startIndex = jsonString.indexOf("\"timestamp\":");
                if (startIndex != -1) {
                    startIndex += "\"timestamp\":".length();
                    int endIndex = jsonString.indexOf(",", startIndex);
                    if (endIndex == -1) { // Last field
                        endIndex = jsonString.indexOf("}", startIndex);
                    }
                    String timestampStr = jsonString.substring(startIndex, endIndex).trim();
                    try {
                        return Long.parseLong(timestampStr);
                    } catch (NumberFormatException e) {
                        // Handle error: invalid timestamp format
                        System.err.println("Invalid timestamp format in record: " + jsonString + " - " + e.getMessage());
                        return recordTimestamp; // Fallback
                    }
                } else {
                    System.err.println("Timestamp field not found in record: " + jsonString);
                    return recordTimestamp; // Fallback
                }
            } catch (Exception e) {
                // Catch any deserialization or parsing errors
                System.err.println("Error extracting timestamp: " + e.getMessage());
                return recordTimestamp; // Fallback to record timestamp on error
            }
        } else {
            // If value is null, just return the record's timestamp (broker timestamp)
            return recordTimestamp;
        }
    }
}

The most crucial part is how you configure Kafka Streams to use this TimestampExtractor. You do this when setting up your StreamsConfig:

import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;

// ...

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, org.apache.kafka.common.serialization.Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, org.apache.kafka.common.serialization.Serdes.String().getClass().getName()); // Or your specific Serde

// THIS IS THE KEY CONFIGURATION
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());

// If you have different extractors for different topics, you can use the TopicNameExtractor
// and a map of extractors, but DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG is for all.

When Kafka Streams processes a record, it will call your extract method. Inside this method, tp.value() refers to the deserialized value of the record. If your DEFAULT_VALUE_SERDE_CLASS_CONFIG is Serdes.String(), then tp.value() will be a String. If you’ve configured a JsonSerde for a SensorReading POJO, tp.value() would be a SensorReading object.

The recordTimestamp parameter is the timestamp embedded in the Kafka record’s metadata itself. This is usually the broker’s write timestamp unless the producer explicitly set a timestamp. Your TimestampExtractor’s job is to override this and provide a timestamp from the message payload.

The most surprising thing about TimestampExtractor is that it’s not just about reading a field; it’s about defining the source of truth for event time. You can extract timestamps from headers, from the payload (JSON, Avro, Protobuf), or even compute them based on other fields. The recordTimestamp itself is a fallback; your custom logic can ignore it entirely if the payload provides a more authoritative time.

Here’s the mental model:

  1. Record Arrives: Kafka broker receives a message. It assigns a timestamp (write timestamp) and writes it to the topic.
  2. Consumer Reads: Kafka Streams consumer reads the raw bytes of the message.
  3. Deserialization: The configured ValueSerde deserializes the raw bytes into your application’s data type (e.g., String, byte[], SensorReading POJO).
  4. Timestamp Extraction: Kafka Streams invokes your TimestampExtractor’s extract method.
    • The tp.value() parameter is the deserialized record value.
    • The recordTimestamp parameter is the timestamp from the Kafka record’s metadata.
  5. Event Time Assignment: Your extract method returns a long representing the event time. This is the timestamp Kafka Streams will use for all time-based operations (windowing, joins, aggregations).
  6. Processing: Kafka Streams uses this assigned event time for all subsequent operations within the stream processing topology.

The specific levers you control are:

  • The TimestampExtractor implementation: Your Java code that defines how to find the timestamp.
  • The StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG: This tells Kafka Streams which implementation to use.
  • Your Serdes configuration: Crucially, the extract method receives the deserialized value. If your ValueSerde deserializes to a byte[] and your timestamp is in JSON, your TimestampExtractor needs to parse that byte[] as JSON. If your ValueSerde deserializes to a POJO, your TimestampExtractor can directly access the POJO’s timestamp field.

The one thing most people don’t fully grasp is how TimestampExtractor interacts with your Serdes. If you configure Serdes.String() for your value, tp.value() will be a String. If your actual message is JSON, your TimestampExtractor has to parse that String (which represents JSON). If you configure Serdes.ByteArray(), tp.value() will be byte[], and you’ll parse those bytes. The most convenient way is often to have your ValueSerde deserialize directly into a POJO that has a timestamp getter, making the TimestampExtractor trivial.

If you’ve correctly set the DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG and your TimestampExtractor correctly parses the payload, the next thing you’ll likely encounter is ensuring your windowing logic correctly aligns with the extracted event times.

Want structured learning?

Take the full Kafka-streams course →