A Kafka Streams wall-clock punctuator can execute actions based on the passage of real-world time, independent of the timestamps embedded within Kafka records.
Let’s see this in action. Imagine you have a stream of user activity events, each with a timestamp field indicating when the activity actually happened. You want to send a daily digest of user activity to each user, regardless of whether they generated any activity on a given day. A wall-clock punctuator is perfect for this.
Here’s a simplified Java example:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.time.Duration;
import java.util.Properties;
public class DailyDigestJob {
public static void main(String[] args) {
Properties props = new Properties();
props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "daily-digest-app");
props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, org.apache.kafka.common.serialization.Serdes.String().getClass());
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Assume 'userActivity' is a KStream<String, String> where key is userId and value is activity details
KStream<String, String> userActivity = builder.stream("user-activity-topic");
// Group by user ID to process activity per user
userActivity.groupByKey()
.ளா.process(() -> new org.apache.kafka.streams.kstream.Processor<String, String>() {
private ProcessorContext context;
private long lastDigestSentTimestamp = -1; // Initialize to a value that signifies never sent
@Override
public void init(ProcessorContext context) {
this.context = context;
// Schedule a punctuator to run every hour (for demonstration)
// In a real scenario, you'd schedule this less frequently, e.g., daily
context.schedule(Duration.ofMinutes(60), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
// Check if it's time to send the daily digest (e.g., midnight UTC)
// This logic would be more sophisticated in production
if (shouldSendDailyDigest(timestamp)) {
// In a real app, you'd fetch accumulated activity for the day and send digest
System.out.println("Sending daily digest for user: " + context.currentNode().id() + " at wall-clock time: " + timestamp);
// Reset for the next day
lastDigestSentTimestamp = timestamp;
}
});
}
@Override
public void process(String key, String value) {
// Process individual user activities here if needed for other tasks
// For the daily digest, the punctuator handles the timing.
}
@Override
public void close() {
// Clean up resources if necessary
}
private boolean shouldSendDailyDigest(long currentWallClockTimestamp) {
// Simplified logic: send if current time is after the last sent time and it's "daily"
// A real implementation would check for specific times of day (e.g., midnight UTC)
// and ensure it's only sent once per day.
// For this example, let's say we send it every 24 hours if the punctuator fires.
if (lastDigestSentTimestamp == -1) {
return true; // First time running, send immediately
}
return (currentWallClockTimestamp - lastDigestSentTimestamp) >= Duration.ofDays(1).toMillis();
}
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
}
}
The core idea is context.schedule(Duration.ofMinutes(60), PunctuationType.WALL_CLOCK_TIME, timestamp -> { ... }). This tells Kafka Streams: "Hey, every 60 minutes, based on the actual clock on the machine running this Kafka Streams application, call this provided lambda function and pass it the current real-world timestamp."
This is incredibly powerful because it decouples your application’s logic from the data itself. You can perform time-based maintenance, batching, aggregation, or alerts that must happen at specific real-world intervals, even if there’s no data flowing through Kafka at that exact moment.
The fundamental problem this solves is executing side effects or periodic tasks in a distributed stream processing system that are tied to the system’s clock, not the event’s clock. Think of it like a cron job for your stream processing. You might want to:
- Send periodic reports: Like the daily digest example.
- Clean up state: If you’re maintaining a cache or state store, you might want to periodically expire old entries based on real time.
- Trigger alerts: If a certain condition hasn’t been met for a specific duration (e.g., no user login for 24 hours), you can use a wall-clock punctuator to check and trigger an alert.
- Heartbeat/Liveness checks: Periodically emit a "I’m alive" message.
The PunctuationType.WALL_CLOCK_TIME is the key differentiator. The alternative, PunctuationType.STREAM_TIME, triggers based on the timestamps within your Kafka records. If your records have stale timestamps or a very sparse event rate, STREAM_TIME might not fire frequently enough or at the right "real-world" moments. WALL_CLOCK_TIME guarantees execution based on the actual passage of time.
You can schedule punctuators at different granularities: Duration.ofMillis(100), Duration.ofMinutes(5), Duration.ofHours(1), etc. The timestamp passed to your lambda is a long representing milliseconds since the epoch, just like Kafka record timestamps.
The ProcessorContext is where you access the scheduler. Each Processor instance within your Kafka Streams topology can have its own scheduled punctuators. This means different parts of your application can have independent time-based triggers.
Crucially, the WALL_CLOCK_TIME punctuator is not guaranteed to fire exactly on the interval if the processing thread is busy or the system is under heavy load. It’s a best-effort guarantee. However, it will fire at least once per interval, and the timestamp it provides will be the current wall-clock time. This is sufficient for most periodic tasks where exact millisecond precision isn’t critical, but rather the periodicity is.
The most surprising thing about wall-clock punctuators is that their execution is not inherently tied to any specific Kafka record. While they are scheduled within the context of a Processor that is processing records, the punctuator’s trigger is independent of whether records are arriving or what their timestamps are. This allows you to perform actions even in a "quiet" period for your Kafka topics, which is essential for tasks that need to happen on a strict real-world schedule.
The next concept you’ll likely encounter is how to manage state across these punctuator invocations, especially if you need to aggregate data over time before sending a digest or triggering an alert.