The surprising truth about Kafka Streams serialization is that the format you choose matters far less than how consistently you apply it across your entire Kafka ecosystem.

Let’s see this in action. Imagine a simple Kafka Streams application that consumes raw JSON events, transforms them, and produces new JSON events.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class SimpleJsonProcessor {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "json-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Using StringSerde for JSON

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("raw-json-topic");

        KStream<String, String> transformed = source.mapValues(value -> {
            // In a real app, you'd parse JSON, transform, and then serialize back to JSON string
            System.out.println("Processing: " + value);
            return value.toUpperCase(); // Simple transformation
        });

        transformed.to("processed-json-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In this example, we’re using Serdes.String().getClass() for both keys and values. This means Kafka Streams will treat our JSON data as plain strings. The producer will send a JSON string to raw-json-topic, and the Streams app will read it as a string, process it (here, just converting to uppercase as a placeholder for actual JSON parsing/transformation), and then write the resulting string to processed-json-topic.

This works because both producers and consumers agree on the String serialization/deserialization. However, this is incredibly inefficient and brittle for complex data.

The Problem with Raw Strings for Complex Data

When you serialize complex objects like JSON, Avro, or Protobuf as raw strings, you encounter several issues:

  1. Inefficiency: JSON strings are verbose. Even simple objects can take up significant space, leading to higher network I/O and disk usage.
  2. Schema Evolution: If your JSON structure changes (e.g., adding a new field, changing a field type), downstream consumers might break if they aren’t designed to handle the new schema. Managing schema versions becomes a manual, error-prone process.
  3. Type Safety: Working with JSON as strings means you’re constantly parsing and de-parsing, often losing compile-time type checking. This can lead to runtime errors.

Embracing Structured Serialization

This is where Avro, Protobuf, and proper JSON serialization (using libraries like Jackson) come in. They provide:

  • Compact Binary Formats: Avro and Protobuf serialize data into highly efficient binary formats, drastically reducing message size.
  • Schema Management: Avro and Protobuf are schema-driven. You define your data structure once, and the schema is used for both serialization and deserialization. This enables robust schema evolution – you can add or remove fields without breaking older consumers (with proper schema design).
  • Type Safety: When integrated correctly, these formats allow you to work with strongly-typed objects, catching errors at compile time rather than runtime.

Let’s consider using KafkaAvroSerializer and KafkaAvroDeserializer. This requires setting up a Schema Registry (like Confluent Schema Registry) and defining Avro schemas.

Example Configuration Snippet for Avro:

# ... other properties
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.class);
props.put(StreamsConfig.producerPrefix(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(StreamsConfig.producerPrefix(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); // Your Schema Registry URL

In this Avro setup, SpecificAvroSerde handles the Avro serialization/deserialization using schemas registered in your Schema Registry. Your KStream<String, SpecificRecord> would now contain strongly-typed Avro objects, not raw strings. The transformation logic would operate directly on these objects.

The Key to Consistency: The Serde

The Serde (Serializer/Deserializer) is the linchpin. In Kafka Streams, you configure a DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG. This tells the Streams application how to convert your Java objects (or primitives) into bytes for Kafka and back again.

  • For JSON: You’d typically use a JsonSerde (often provided by libraries like kafka-json-schema or custom implementations using Jackson). This Serde would handle serializing your Java POJOs to JSON strings and deserializing JSON strings back into POJOs.
  • For Avro: You use SpecificAvroSerde or GenericAvroSerde, which relies on a Schema Registry to manage schemas and perform serialization/deserialization.
  • For Protobuf: You’d use a Protobuf Serde, often requiring a Protobuf schema definition (.proto file) and generated Java classes.

The critical point is that every Kafka producer and consumer interacting with a given topic must use compatible serializers and deserializers. If a producer writes Avro and a consumer tries to read it as JSON, you’ll get garbage or errors.

The most surprising thing to many is that the Kafka Streams StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG apply globally to the application. When you define Serdes.String().getClass(), you are telling all internal Kafka producers and consumers managed by this Streams application to use String serialization. If you then try to use streamsBuilder.stream("my-avro-topic") where data was written as Avro, it will fail because the default deserializer is StringDeserializer, not an Avro deserializer. You must explicitly provide the Serde for that specific stream operation or set the default Serdes correctly at the StreamsConfig level if all topics use the same format.

The Hidden Trap: Topic-Specific Serdes

While DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG are powerful, sometimes you have a mix of formats across topics. In such cases, you can override the default Serdes for a specific KStream or KTable using the .through() or .to() methods, or by explicitly creating and configuring Consumed and Produced objects.

For example, to read from an Avro topic while your application’s default Serdes are JSON:

// Assuming default Serdes are JSON
KStream<String, MyAvroObject> avroStream = builder.stream(
    "my-avro-topic",
    Consumed.with(Serdes.String(), new SpecificAvroSerde<>()) // Explicitly specify Avro Serde here
);
// Configure the SpecificAvroSerde with your Schema Registry URL and enable schema registry
SpecificAvroSerde<MyAvroObject> avroSerde = new SpecificAvroSerde<>();
Map<String, String> config = new HashMap<>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
avroSerde.configure(config, false); // false for value deserializer

// ... then use avroStream

This ability to mix and match (carefully!) is where the real power and complexity lie. You can have one Kafka Streams app consume JSON, another consume Avro, and even have one app consume JSON from one topic and Avro from another, as long as each component (producer, consumer, or the Streams app itself) uses the correct Serde for the data it’s handling.

The next hurdle you’ll face is understanding how Kafka Streams handles state stores and how serialization impacts their persistence and recovery.

Want structured learning?

Take the full Kafka-streams course →