Kafka Streams punctuators are how you schedule periodic processing within your stream processing applications.

Let’s see one in action. Imagine you have a stream of user events, and you want to count how many events each user generates in a rolling 5-minute window.

// Assume 'builder' is a StreamsBuilder and 'userEvents' is a KStream<String, UserEvent>
KTable<String, Long> userEventCounts = userEvents
    .groupByKey()
    .count(Materialized.as("user-event-counts-store")); // This is the state store

// Now, let's add a punctuator to this topology
userEventCounts.process(new ProcessorSupplier<String, Long>() {
    @Override
    public Processor<String, Long> get() {
        return new Processor<String, Long>() {
            private KeyValueStore<String, Long> store;
            private ProcessorContext context;
            private final long PUNTUATOR_INTERVAL_MS = 5_000L; // 5 seconds

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
                // Get the state store we created earlier
                this.store = context.getStateStore("user-event-counts-store");

                // Schedule the punctuator
                context.schedule(PUNTUATOR_INTERVAL_MS, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                    System.out.println("--- Punctuation triggered at: " + timestamp + " ---");
                    // Here, we'll just log the current counts for demonstration
                    // In a real app, you might query the store, aggregate, or send to another topic
                    try (var iterator = store.all()) {
                        while (iterator.hasNext()) {
                            var entry = iterator.next();
                            System.out.println("User: " + entry.key() + ", Count: " + entry.value());
                        }
                    }
                });
            }

            @Override
            public void process(Record<String, Long> record) {
                // This processor is just here to hold the state store and schedule the punctuator.
                // The actual counting happens in the .count() operation upstream.
                // We don't need to do anything here for this specific example.
            }

            @Override
            public void close() {
                // Cleanup if necessary
            }
        };
    }
}, "user-event-counts-store"); // Attach the processor to the state store

The core problem punctuators solve is executing logic at regular intervals, independent of new data arriving. Think of it as a heartbeat for your stream processing. You might use them for:

  • Periodic aggregation and reporting: Calculating and sending out summary statistics (like the example above) every minute or hour.
  • State cleanup: Removing old or stale data from state stores to manage memory and disk usage.
  • Time-based actions: Triggering alerts, sending notifications, or performing maintenance tasks based on elapsed time.
  • Heartbeat mechanisms: Ensuring your application is still alive and processing.

Kafka Streams offers two types of punctuation:

  1. PunctuationType.WALL_CLOCK_TIME: This is based on the actual system time of the machine running the Kafka Streams application. It’s like a wall clock – it ticks at a consistent rate regardless of how much data is flowing. This is what we used in the example.
  2. PunctuationType.STREAM_TIME: This is based on the logical time within your Kafka Streams application, typically driven by the timestamps of the records being processed. If your stream has periods of inactivity, STREAM_TIME will not advance. This is crucial for maintaining correct windowing semantics but can be less predictable for tasks that need to happen at fixed real-world intervals.

The context.schedule() method is your entry point. It takes the interval (in milliseconds), the punctuation type, and a PunctuationCallback (a lambda or anonymous class) that will be executed when the interval elapses. The callback receives a long representing the current punctuation timestamp.

When you attach a Processor that uses context.schedule(), Kafka Streams ensures that the init() method is called for that processor, and the punctuator is registered with the stream processing engine. The engine then manages the timing and execution of your callback.

The most surprising aspect of PunctuationType.WALL_CLOCK_TIME is its independence from record timestamps. You can schedule a punctuator to fire every 5 seconds, and it will fire every 5 seconds according to the machine’s clock, even if your Kafka topics are completely empty for an hour. This makes it ideal for actions that need to happen on a real-time schedule, like sending out hourly reports or cleaning up state that’s older than a certain real-world duration. The timestamp passed to the callback is the wall-clock time when the punctuation occurred.

The next logical step after mastering periodic processing is handling out-of-order events and late data, which often involves understanding Kafka Streams’ windowing mechanisms and how they interact with STREAM_TIME punctuation.

Want structured learning?

Take the full Kafka-streams course →