Interactive Queries let you access Kafka Streams’ internal state stores directly from your application, usually via a REST API.
Let’s see it in action. Imagine a Kafka Streams application that counts word occurrences from a Kafka topic.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@SpringBootApplication
@EnableKafkaStreams
@RestController
public class KafkaStreamsQueryApplication {
private static final String INPUT_TOPIC = "word-counts-input";
private static final String OUTPUT_TOPIC = "word-counts-output";
private static final String WORD_COUNTS_STORE = "word-counts-store";
@Autowired
private KafkaStreams streams;
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsQueryApplication.class, args);
}
@Bean
public Topology topology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> source = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));
source.groupByKey()
.count()
.toStream()
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
// This is where the state store is defined
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()))
.groupByKey()
.count()
.toTable(); // This implicitly creates a state store named WORD_COUNTS_STORE
return builder.build();
}
@Bean
public KafkaStreams kafkaStreams(Topology topology) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "word-counts-app");
props.put("state.dir", "/tmp/kafka-streams"); // Ensure this directory exists
props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$String");
props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$Long");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
return streams;
}
// REST endpoint to query the state store
@GetMapping("/counts/{word}")
public Long getWordCount(@PathVariable String word) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(WORD_COUNTS_STORE, QueryableStoreTypes.keyValueStore());
Long count = store.get(word);
return count == null ? 0L : count;
}
// Endpoint to get all counts
@GetMapping("/counts")
public Map<String, Long> getAllWordCounts() {
Map<String, Long> allCounts = new HashMap<>();
ReadOnlyKeyValueStore<String, Long> store = streams.store(WORD_COUNTS_STORE, QueryableStoreTypes.keyValueStore());
store.all().forEachRemaining(entry -> allCounts.put(entry.key, entry.value));
return allCounts;
}
}
This application takes words from word-counts-input, counts them using groupByKey().count(), and materializes this count into a state store named word-counts-store. The toTable() operation is key here; it tells Kafka Streams to maintain this data in a changelog topic and back it with a local state store. The REST endpoints /counts/{word} and /counts then query this local state store.
The problem Interactive Queries solve is that Kafka Streams applications are typically stateless from an external perspective. They consume from Kafka, process, and produce to Kafka. If you want to know the current aggregate state (like the total count of a word), you’d normally have to consume the output topic and maintain your own external store. Interactive Queries bypass this by letting you query the internal state store that Kafka Streams is already maintaining for its own processing.
Internally, Kafka Streams partitions your stream processing across multiple instances. Each instance maintains local state stores for the partitions it’s responsible for. When you call streams.store(storeName, QueryableStoreTypes.keyValueStore()), Kafka Streams gives you access to the local state store for that specific instance. If you query for a key that happens to be managed by another instance of your application, the store.get(key) call will return null. To handle this robustly, you’d typically add logic to your REST endpoint to discover which instance is hosting the partition for a given key and forward the request if necessary, or use a library that abstracts this distribution. The provided example is a simplified view, assuming the queried key resides on the instance serving the REST request.
The most powerful aspect of Interactive Queries is that they operate directly on the state stores that Kafka Streams uses for its internal processing. This means the data you query is always up-to-date with the latest processed records, as it’s the same data powering the stream processing itself. You don’t need a separate data pipeline or a secondary materialization step for querying. The state stores are backed by RocksDB by default, which is highly optimized for key-value lookups, making these queries very fast.
When you define a KTable or a GlobalKTable using toTable() or toGlobalKTable(), Kafka Streams automatically creates and manages a state store. The name you give this store (e.g., WORD_COUNTS_STORE in the example, implicitly named by count() which returns a KTable) is what you use to retrieve it via streams.store(). The QueryableStoreTypes enum provides different ways to access stores, like keyValueStore(), timestampStore(), or windowStore(), depending on the type of state you’ve materialized.
A common pitfall is forgetting to set state.dir in your Kafka Streams configuration. Without it, Kafka Streams won’t know where to create or find the local state directories, and your state stores won’t be initialized, leading to NullPointerExceptions when you try to access them via Interactive Queries. Ensure the path specified (e.g., /tmp/kafka-streams) is writable by the Kafka Streams application process.
The next step is to understand how to handle queries for keys that reside on different instances of your Kafka Streams application.