Kafka Connect’s CDC (Change Data Capture) connectors are the secret sauce for streaming database modifications live.
Let’s watch it in action with Debezium, a popular CDC tool, and PostgreSQL.
First, we need a PostgreSQL database with logical replication enabled. This is the core mechanism that allows PostgreSQL to stream its internal transaction logs.
-- In PostgreSQL:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 5;
ALTER SYSTEM SET max_wal_senders = 5;
-- Restart PostgreSQL after these changes
wal_level = logical tells PostgreSQL to write enough information to the Write-Ahead Log (WAL) for replication purposes. max_replication_slots and max_wal_senders are crucial; they reserve resources for the replication process. Without sufficient slots, the connector can’t even connect to stream.
Next, we set up a Kafka Connect cluster. This is the framework that runs our Debezium connector. A minimal setup might involve a connect-distributed.properties file.
# connect-distributed.properties
bootstrap.servers=kafka-broker-1:9092,kafka-broker-2:9092
group.id=my-connect-group
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
plugin.path=/usr/share/java/kafka-connect/
The plugin.path is where Kafka Connect looks for connector JAR files. You’d typically place the Debezium PostgreSQL connector JAR here. bootstrap.servers points to your Kafka cluster. The converter settings determine how data is serialized; JSON is common for simplicity, though Avro is often preferred for schema evolution.
Now, we deploy the Debezium PostgreSQL connector configuration to Kafka Connect.
# debezium-postgres-connector.json
{
"name": "my-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "your-postgres-host",
"database.port": 5432,
"database.user": "debezium_user",
"database.password": "debezium_password",
"database.dbname": "your_database_name",
"database.server.name": "postgres-server-1",
"plugin.name": "pgoutput",
"table.include.list": "public.users,public.orders",
"schema.history.internal.kafka.bootstrap.servers": "kafka-broker-1:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server-1"
}
}
This configuration tells Debezium:
- Which connector class to use (
PostgresConnector). - How to connect to PostgreSQL (hostname, port, credentials, dbname).
- A logical
database.server.namewhich prefixes your Kafka topics (e.g.,postgres-server-1.public.users). plugin.name: "pgoutput"specifies the PostgreSQL output plugin Debezium uses for logical decoding. This is critical; if it’s wrong, nothing streams.table.include.listfilters which tables we want to track.schema.history.internal.kafka.topicis where Debezium stores schema change events, essential for reconstructing table schemas over time.
Once this configuration is posted to Kafka Connect (e.g., via curl -X POST -H "Content-Type: application/json" --data @debezium-postgres-connector.json http://connect-api-host:8083/connectors), Debezium starts polling PostgreSQL’s WAL.
When you insert a row into public.users:
-- In PostgreSQL:
INSERT INTO public.users (id, name, email) VALUES (1, 'Alice', 'alice@example.com');
Debezium intercepts this change from the WAL, transforms it into a structured event (typically JSON or Avro), and publishes it to a Kafka topic named postgres-server-1.public.users. The message payload will look something like this (simplified JSON):
{
"schema": { ... },
"payload": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"source": { ... },
"op": "c", // 'c' for create
"ts_ms": 1678886400000
}
}
The op: "c" signifies a create operation. If you update the user:
-- In PostgreSQL:
UPDATE public.users SET email = 'alice.updated@example.com' WHERE id = 1;
The Kafka message would be:
{
"schema": { ... },
"payload": {
"before": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"after": {
"id": 1,
"name": "Alice",
"email": "alice.updated@example.com"
},
"source": { ... },
"op": "u", // 'u' for update
"ts_ms": 1678886500000
}
}
Notice the before and after states, which is the essence of CDC. A delete would show after: null and op: "d".
The database.server.name in the connector config is not just an arbitrary name; it’s the primary namespace for your Kafka topics. This allows you to run multiple CDC connectors from different databases or even different instances of the same database against the same Kafka cluster without topic name collisions. For instance, you might have postgres-server-1.public.users and mysql-server-a.sales.orders.
The schema.history.internal.kafka.topic is a critical piece that often gets overlooked. Debezium uses this topic to store schema evolution events (like ALTER TABLE). When a consumer reads data from the main CDC topic, it can also consult this history topic to understand the schema at the time each message was produced, enabling robust schema evolution handling for downstream applications. If this topic isn’t configured or accessible, Debezium can’t recover its understanding of the source table schemas, leading to potential data corruption or processing errors if schemas change.
The real power comes when you have multiple consumers on these Kafka topics. A data warehouse can ingest these events to build near real-time analytics. A search index can be updated instantly. Microservices can react to database changes as they happen, decoupled from the database itself.
The plugin.name parameter is actually one of the most frequently misunderstood parts of the Debezium PostgreSQL connector. While pgoutput is the modern, recommended plugin for logical replication in PostgreSQL versions 10 and above, older versions or specific configurations might require wal2json. If your connector starts, but no messages appear and you see no errors, double-check that your PostgreSQL is configured with shared_preload_libraries = 'pgoutput' (or wal2json) and that the plugin.name in the connector config matches what’s actually available and enabled in PostgreSQL.
The next thing you’ll likely grapple with is managing schema evolution on the Kafka topics themselves, especially if you’re not using Avro converters, which can lead to consumer-side parsing issues as your database schema changes over time.