Kafka Streams Processor API offers a way to build complex, stateful stream processing applications that go beyond the simple DSL abstractions.
Let’s say you have a stream of user click events, and you want to count how many times each user clicks on a specific product category within a 5-minute window. The Kafka Streams DSL can handle this, but what if you need to do something more nuanced, like detect a "burst" of activity from a single user within a short period, or integrate with an external service to enrich the click data before processing? That’s where the Processor API shines. It gives you fine-grained control over how records are processed and state is managed.
Here’s a simple example. Imagine we have a topic clicks with messages like {"user_id": "user123", "category": "electronics", "timestamp": 1678886400000}. We want to build a topology that counts clicks per user for the "electronics" category.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
public class CustomProcessorTopology {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "custom-processor-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
// Define the state store
String countStoreName = "user-category-counts";
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(countStoreName),
Serdes.String(), // Key: user_id
Serdes.Long() // Value: count
));
// Define the custom processor
builder.addProcessor("click-counter-processor", new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext context) {
this.context = context;
// Get the state store
this.store = context.getStateStore(countStoreName);
// Schedule a punctuation function to periodically flush counts
context.schedule(java.time.Duration.ofMinutes(1), (timestamp) -> {
System.out.println("Punctuation triggered at: " + timestamp);
// Example: In a real scenario, you might do something here like
// send aggregated counts to another topic or perform cleanup.
// For this example, we'll just iterate and print.
store.all().forEachRemaining(entry ->
System.out.println("User: " + entry.key() + ", Count: " + entry.value())
);
});
}
@Override
public void process(String key, String value) {
// Assuming value is a JSON string like {"user_id": "user123", "category": "electronics", ...}
// In a real app, you'd parse this JSON. For simplicity, we'll assume key is user_id.
String userId = key; // Simplified assumption
if ("electronics".equals(getCategoryFromJson(value))) { // Placeholder for actual category extraction
long currentCount = store.getOrDefault(userId, 0L);
store.put(userId, currentCount + 1);
System.out.println("Processed click for user: " + userId + ", New count: " + (currentCount + 1));
}
}
// Placeholder for JSON parsing
private String getCategoryFromJson(String jsonValue) {
// In a real application, you would parse the JSON string to extract the category.
// For demonstration, let's assume the value itself is just the category name for simplicity IF key is user_id.
// If the input is {"user_id": "user123", "category": "electronics"}, you'd parse it.
// Let's simulate based on the example structure:
if (jsonValue.contains("\"category\": \"electronics\"")) {
return "electronics";
}
return "other";
}
@Override
public void close() {
// Cleanup if necessary
}
};
}
}, countStoreName); // Attach the processor to the state store
// Connect the processor to the input topic
builder.stream("clicks")
.selectKey((key, value) -> extractUserIdFromJson(value)) // Assuming key is null initially and we extract user_id
.to("processed-clicks"); // This is a placeholder; the processor itself will handle outputting/state changes
// To actually route records to the processor, we need to use `process` method on the KStream
builder.stream("clicks")
.selectKey((key, value) -> extractUserIdFromJson(value)) // Extract user ID to be the key for the processor
.process(new org.apache.kafka.streams.kstream.ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new org.apache.kafka.streams.processor.Processor<String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Long> store;
private org.apache.kafka.streams.kstream.Processor<String, String> outputProcessor; // For sending to output topic
@Override
public void init(ProcessorContext context) {
this.context = context;
this.store = context.getStateStore(countStoreName);
// Create a downstream processor to forward records if needed
// For this example, we're primarily focusing on state updates within the processor.
// If you wanted to forward the *original* record after processing, you'd do it here.
}
@Override
public void process(String userId, String value) {
if ("electronics".equals(getCategoryFromJson(value))) {
long currentCount = store.getOrDefault(userId, 0L);
store.put(userId, currentCount + 1);
System.out.println("Processed click for user: " + userId + ", New count: " + (currentCount + 1));
// If you wanted to forward the record to another topic:
// context.forward(userId, value); // This would typically be done with a `forward` call
// or by creating a KStream from the processor.
// For this example, we're just updating state.
}
}
@Override
public void close() {}
};
}
}, countStoreName); // Specify the state store to be used
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
// Clean up local state if necessary for development/testing
// streams.cleanUp();
streams.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
// Helper to extract user ID from a JSON string. In a real app, use a JSON library.
private static String extractUserIdFromJson(String json) {
// Simple parsing for example: assumes {"user_id": "...", ...}
int startIndex = json.indexOf("\"user_id\": \"") + 12;
int endIndex = json.indexOf("\"", startIndex);
if (startIndex > 11 && endIndex != -1) {
return json.substring(startIndex, endIndex);
}
return "unknown_user"; // Fallback
}
// Helper to extract category (placeholder)
private static String getCategoryFromJson(String json) {
// Simple parsing for example: assumes {"category": "...", ...}
int startIndex = json.indexOf("\"category\": \"") + 13;
int endIndex = json.indexOf("\"", startIndex);
if (startIndex > 12 && endIndex != -1) {
return json.substring(startIndex, endIndex);
}
return "other"; // Fallback
}
}
The core of the Processor API is the Processor interface. You implement init(ProcessorContext context), process(K key, V value), and close(). The ProcessorContext gives you access to state stores, scheduling capabilities (for "punctuators"), and the ability to forward records.
Crucially, you need to define and add state stores to your topology using builder.addStateStore(). In the example, Stores.keyValueStoreBuilder creates a KeyValueStore. This store is local to each Kafka Streams instance and is backed by Kafka changelog topics, ensuring fault tolerance and scalability.
The builder.addProcessor() method registers your custom processor. You then need to connect it to your data stream. The most direct way is using stream.process(ProcessorSupplier, storeNames). This tells Kafka Streams to send records from the stream to the processor, and it makes the specified storeNames available within the processor’s init method.
Notice the context.schedule() call. This is a powerful feature allowing you to execute code periodically, based on time. This is useful for flushing aggregated data, performing cleanup, or implementing time-based logic that doesn’t directly map to tumbling or sliding windows in the DSL.
The most surprising thing about the Processor API is how it blurs the lines between a simple Processor and a KStream’s processing logic. While you can define a standalone Processor and link it, the KStream.process() method is often how you’ll integrate a custom Processor into a larger topology, especially when you need to access state stores.
Here’s the topology in action. Imagine you have Kafka running and a topic named clicks. You’d produce messages like this (using kafka-console-producer or a client library):
kafka-console-producer --broker-list localhost:9092 --topic clicks --property "value.separator= "
{"user_id": "userA", "category": "electronics", "timestamp": 1678886400000}
{"user_id": "userB", "category": "books", "timestamp": 1678886401000}
{"user_id": "userA", "category": "electronics", "timestamp": 1678886402000}
{"user_id": "userA", "category": "clothing", "timestamp": 1678886403000}
{"user_id": "userA", "category": "electronics", "timestamp": 1678886404000}
When you run the Java application and produce these messages, you’ll see output like this in your application logs:
Processed click for user: userA, New count: 1
Processed click for user: userA, New count: 2
Processed click for user: userA, New count: 3
Punctuation triggered at: 1678886400000
User: userA, Count: 3
The Punctuation triggered message appears every minute due to context.schedule(java.time.Duration.ofMinutes(1), ...). This allows you to inspect or act upon the state at regular intervals. If you were to stop and restart the application, the state would be restored from the changelog topic, and counts would continue from where they left off.
The Processor API is your escape hatch when the DSL becomes too restrictive. It’s also the foundation upon which the DSL itself is built. Understanding it gives you the power to implement virtually any stream processing logic. The KeyValueStore is just one type of state store; Kafka Streams supports WindowStore and TimestampStore as well, enabling more complex time-series processing.
When you have very high throughput and need to explicitly control how records are distributed to your processors for parallelization, you’ll often use KStream.process(ProcessorSupplier, storeNames) on multiple branches of your topology, each potentially consuming from different partitions or having its own dedicated set of state stores.