Kafka Streams offers two fundamental operations for transforming events: filter and flatMap.

Let’s see them in action. Imagine we have a Kafka topic named raw-events containing JSON strings representing user activity. We want to process these events to extract only user registrations and then flatten the resulting data into individual user IDs.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

import java.util.Arrays;
import java.util.Properties;

public class StreamsTransformDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-transform-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> rawEvents = builder.stream("raw-events");

        // Filter for registration events
        KStream<String, String> registrationEvents = rawEvents.filter((key, value) -> value.contains("\"eventType\":\"REGISTER\""));

        // FlatMap to extract user IDs
        KStream<String, String> userIds = registrationEvents.flatMap((key, value) -> {
            // In a real scenario, you'd parse JSON to extract the userId
            // For simplicity, we'll assume the userId is directly in the string
            String userId = value.split("\"userId\":\"")[1].split("\"")[0];
            return Arrays.asList(new org.apache.kafka.streams.KeyValue<>(userId, userId)).iterator();
        });

        userIds.to("user-ids");

        // Start the Kafka Streams application
        org.apache.kafka.streams.KafkaStreams streams = new org.apache.kafka.streams.KafkaStreams(builder.build(), props);
        streams.start();

        // Graceful shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

This code defines a Kafka Streams application that reads from raw-events, filters for messages containing "eventType":"REGISTER", and then uses flatMap to extract and output user IDs to a new topic called user-ids. The filter operation acts as a gatekeeper, allowing only specific events to pass through. flatMap, on the other hand, can transform a single input record into zero, one, or multiple output records. Here, it takes a registration event and produces a single output record: the user ID.

The core problem this solves is efficiently processing and refining event streams without writing complex, stateful consumers. Kafka Streams handles the distributed nature, fault tolerance, and state management for you. Internally, filter is a stateless operation. For each record, it evaluates a Predicate. If the predicate returns true, the record is passed to the next stage; otherwise, it’s dropped. flatMap is also stateless in this basic form. It takes a FlatMapper function that receives an input record and returns an Iterable of KeyValue pairs. This allows for one-to-many transformations.

A common misconception is that flatMap always produces more records than it consumes. This isn’t true; it can produce zero records (effectively acting like a filter) or exactly one record. The key is its ability to potentially produce multiple records from a single input, which is crucial for scenarios like splitting comma-separated values within a single field into individual records.

The next concept you’ll encounter is how to handle more complex transformations that require looking at multiple events or maintaining state, like aggregations or joins.

Want structured learning?

Take the full Kafka-streams course →