Kafka S3 Sink can archive Kafka topics to Amazon S3, but it’s not a simple "set it and forget it" operation. The real magic is in how it partitions your data in S3, which directly impacts how you query it later.

Let’s see it in action. Imagine you have a Kafka topic named user_events with JSON messages like this:

{"user_id": "abc123", "event_type": "login", "timestamp": 1678886400000}
{"user_id": "xyz789", "event_type": "logout", "timestamp": 1678886460000}
{"user_id": "abc123", "event_type": "purchase", "timestamp": 1678886520000}

You want to sink this to S3, partitioned by event_type and then by timestamp.

Here’s a snippet of the Kafka Connect S3 Sink connector configuration:

name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=user_events
s3.region=us-east-1
s3.bucket.name=my-kafka-data-bucket
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.s3.partitioner.TimeBasedPartitioner
partitioner.interval.unit=HOURS
partitioner.interval.size=1
path.format='event_type'=YYYY/MM/dd/HH
locale=en
timezone=UTC
flush.size=1000
rotate.interval.ms=60000
schema.compatibility=NONE

With this configuration, the user_events topic will be written to S3. The partitioner.class=io.confluent.connect.s3.partitioner.TimeBasedPartitioner and path.format='event_type'=YYYY/MM/dd/HH directives tell the connector to organize the data.

Here’s what the S3 bucket my-kafka-data-bucket might look like after some data has arrived:

my-kafka-data-bucket/
├── user_events/
│   ├── event_type=login/
│   │   ├── 2023/
│   │   │   ├── 03/
│   │   │   │   ├── 15/
│   │   │   │   │   ├── 0000000000.json
│   │   │   │   │   ├── 0000000001.json
│   │   ├── event_type=purchase/
│   │   │   ├── 2023/
│   │   │   │   ├── 03/
│   │   │   │   │   ├── 15/
│   │   │   │   │   │   ├── 0000000002.json
│   ├── event_type=logout/
│   │   ├── 2023/
│   │   │   ├── 03/
│   │   │   │   ├── 15/
│   │   │   │   │   ├── 0000000003.json

The path.format string is crucial. It defines the directory structure. 'event_type'=YYYY/MM/dd/HH means that for each record, the connector will extract the event_type field and use it as a partition key, followed by the year (YYYY), month (MM), day (dd), and hour (HH) derived from the record’s timestamp. The connector adds a sequential number to the filename (e.g., 0000000000.json).

The partitioner.interval.unit=HOURS and partitioner.interval.size=1 work in conjunction with the path.format to group data. Here, it means a new "time partition" is created every hour. This influences how often the connector flushes data to S3.

The flush.size=1000 and rotate.interval.ms=60000 are also important. flush.size dictates how many records the connector buffers before writing a single S3 object. rotate.interval.ms ensures that even if the flush.size isn’t reached, an object is written at least every minute. This prevents unbounded memory usage and ensures data is periodically available in S3.

The S3 Sink connector’s partitioning strategy is the primary mechanism for organizing your Kafka data in S3 for efficient querying. By default, it uses time-based partitioning. However, you can define custom partitioners or use the TimeBasedPartitioner with specific path.format strings to reflect your data’s structure. For instance, if your Kafka messages had a customer_id field, you could partition by customer_id and then by time: path.format='customer_id'=YYYY/MM/dd/HH. This allows tools like Athena or Spark to prune partitions effectively, dramatically speeding up queries.

When you have a large number of Kafka topics or high-throughput topics, the choice of partitioning strategy becomes critical. A poorly chosen strategy can lead to "hot" partitions (too many files in one directory) or an explosion of tiny files, both of which degrade query performance and increase S3 costs. The S3 Sink connector supports various partitioner implementations, including TimeBasedPartitioner, TimeBasedGzipPartitioner (for compressed data), and FieldPartitioner (for partitioning based on specific record fields). You can also implement custom partitioners if your needs are highly specialized.

The schema.compatibility=NONE setting is often used in S3 Sink connectors when you’re not using Avro or Protobuf and don’t need schema evolution tracking within the S3 data itself. For JSON, this is typical. If you were using Avro, you’d likely set format.class=io.confluent.connect.s3.format.avro.AvroFormat and manage schema compatibility accordingly.

The real power of this setup comes when you want to query this data using services like Amazon Athena. Because the data is partitioned according to the path.format, Athena can understand the directory structure as queryable partitions. A query like SELECT * FROM user_events WHERE event_type = 'login' AND year = '2023' AND month = '03' AND day = '15' will only scan the my-kafka-data-bucket/user_events/event_type=login/2023/03/15/ prefix, avoiding the need to scan the entire bucket.

The rotate.interval.ms parameter is particularly interesting because it dictates how often the connector commits a file to S3. If this value is too low (e.g., 1000ms), you’ll end up with many small files, which can impact S3 list operations and query performance. If it’s too high, you might experience higher latency for data to appear in S3, and the connector might consume more memory before flushing. The optimal value depends on your data volume and latency requirements.

The path.format string syntax itself is quite flexible. You can include static string literals (like event_type=), and then use YYYY, MM, dd, HH, mm, ss for time-based partitioning or field names enclosed in backticks (e.g., user_id) for field-based partitioning. The connector will dynamically extract these values from your Kafka records.

Once you have your data partitioned correctly in S3, the next logical step is to leverage services like AWS Glue Crawlers to discover the schema and create tables in the AWS Glue Data Catalog, making it easily queryable by Athena or other AWS analytics services.

Want structured learning?

Take the full Kafka course →