Kafka Connect is how you get data into and out of Kafka, and the Elasticsearch Sink connector is a popular way to push that data into Elasticsearch for searching and analysis.

Let’s see it in action. Imagine you have a Kafka topic named ecommerce.orders with JSON messages like this:

{
  "order_id": "ORD12345",
  "customer_id": "CUST9876",
  "order_date": "2023-10-27T10:00:00Z",
  "total_amount": 150.75,
  "items": [
    {"product_id": "PROD001", "quantity": 2, "price": 50.00},
    {"product_id": "PROD005", "quantity": 1, "price": 50.75}
  ]
}

You want this data in Elasticsearch in an index called orders. Here’s a simplified Kafka Connect configuration for the Elasticsearch Sink:

{
  "name": "my-es-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "ecommerce.orders",
    "connection.url": "http://elasticsearch-node:9200",
    "type.name": "_doc",
    "schema.ignore": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "flush.size": "100",
    "retries": "5",
    "retry.backoff.ms": "1000"
  }
}

When a new order comes into the ecommerce.orders Kafka topic, this connector picks it up. It takes the JSON message, transforms it (if needed, though we’ve told it to ignore schema for simplicity here), and sends it as a document to your Elasticsearch cluster. Elasticsearch then indexes this document, making it searchable. The connection.url points to your Elasticsearch instance, topics specifies which Kafka topic to read from, and type.name: "_doc" is the modern Elasticsearch way of defining document types. flush.size dictates how many records are batched before being sent to Elasticsearch, balancing latency and efficiency.

The problem this solves is bridging the gap between streaming event data in Kafka and the need for immediate, rich querying capabilities offered by Elasticsearch. Instead of batch ETL jobs, you get near real-time ingestion. You can then use Kibana to visualize order trends, search for specific orders, or build dashboards showing customer activity.

Internally, the connector works by polling Kafka for new messages on the specified topics. It buffers these messages up to its flush.size. Once the buffer is full, or if a configurable timeout occurs, it sends a bulk API request to Elasticsearch. This bulk request contains multiple "index" operations, each corresponding to a Kafka message. Elasticsearch processes these operations in parallel, indexing the documents. The key.converter and value.converter settings define how Kafka messages are deserialized; here, we’re using JsonConverter and disabling schema tracking for simplicity, treating the message payload directly as JSON.

The schema.ignore: "true" setting is crucial here. If you were using Kafka’s schema registry and Avro, you’d typically want to manage schemas. But for simple JSON, telling the connector to ignore any schema information in the Kafka message simplifies configuration and ensures the JSON payload is sent as-is to Elasticsearch. The type.name: "_doc" is a remnant of older Elasticsearch versions; in modern Elasticsearch (7.x and later), it’s the only valid document type and essentially a placeholder.

One thing that often trips people up is how the connector handles document IDs in Elasticsearch. By default, if your Kafka message key is present and can be mapped to a string, the connector will use it as the Elasticsearch document ID. If the key is null or cannot be mapped, it will generate a unique ID. This behavior can be controlled with document.id.strategy.class if you need more advanced ID generation, but for simple use cases, relying on the Kafka key or letting it auto-generate is usually sufficient. Be mindful of this if you have duplicate Kafka messages with the same key – the connector will idempotently update the Elasticsearch document.

The next step after getting data into Elasticsearch is often transforming it for better querying or analysis, which might lead you to explore Kafka Connect Single Message Transforms (SMTs).

Want structured learning?

Take the full Kafka course →