Kafka brokers and clients can be configured to use SSL/TLS for encryption and SASL for authentication, ensuring secure and authenticated communication.
Let’s see it in action. Imagine a Kafka producer trying to send a message to a broker that requires SSL and SASL/PLAIN authentication.
// Producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// SSL configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/kafka/secrets/truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore_password");
// If client auth is required by the broker, uncomment and configure:
// props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/kafka/secrets/keystore.jks");
// props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore_password");
// SASL configuration
props.put(CommonClientConfigs.SASL_MECHANISM_CONFIG, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer_user\" password=\"producer_password\";");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Sending a message
try {
RecordMetadata metadata = producer.send(new ProducerRecord<>("my-topic", "key", "value")).get();
System.out.printf("Sent message to partition %d at offset %d%n", metadata.partition(), metadata.offset());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
This producer is configured to connect to kafka-broker-1 on port 9093. It specifies SSL as the security protocol. The truststore.jks file, located at /etc/kafka/secrets/truststore.jks with the password truststore_password, is used to verify the broker’s certificate. If the broker were configured to require client authentication, the keystore.jks would be used to present the client’s certificate.
For authentication, PLAIN SASL is selected. The SASL_JAAS_CONFIG provides the credentials: producer_user and producer_password. The PlainLoginModule is a simple mechanism that sends these credentials in plain text (though the SSL layer encrypts this transmission).
When the producer.send() method is called, the Kafka client library first establishes an SSL connection to the broker. During the SSL handshake, it verifies the broker’s certificate against the truststore. Once the SSL connection is established, the client and broker perform the SASL handshake. For PLAIN, the client sends its username and password over the now-encrypted SSL channel. If both the SSL handshake and the SASL authentication succeed, the producer can then send its messages.
The problem this solves is twofold: confidentiality and integrity/authenticity. Without SSL, any data sent between the client and broker is in plain text, vulnerable to eavesdropping. Without SASL, anyone who can connect to the broker could potentially send or consume data, leading to unauthorized access or data manipulation. By combining SSL and SASL, you ensure that only authorized clients can connect to the broker and that all communication is encrypted, protecting sensitive data.
The broker configuration to support this would look something like this in server.properties:
listeners=SSL://kafka-broker-1:9093
ssl.keystore.location=/etc/kafka/secrets/kafka.server.keystore.jks
ssl.keystore.password=server_keystore_password
ssl.key.password=server_key_password
ssl.truststore.location=/etc/kafka/secrets/kafka.server.truststore.jks
ssl.truststore.password=server_truststore_password
ssl.client.auth=required # or wanted, or none
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN # Often needed for inter-broker communication too
listener.security.protocol.map=SSL:SSL,PLAIN:SASL_PLAIN
sasl.login.callback.handler.class=org.apache.kafka.common.security.plain.PlainSaslFlowController
sasl.kerberos.service.name=kafka # If using Kerberos, not PLAIN
Here, listeners defines that only SSL is available on port 9093. The broker uses its own keystore (kafka.server.keystore.jks) to present its identity to clients and a truststore (kafka.server.truststore.jks) to verify client certificates if ssl.client.auth=required. SASL is enabled, and PLAIN is specified as an allowed mechanism. The listener.security.protocol.map is crucial: it tells Kafka that the SSL listener (on 9093) should also handle SASL_PLAIN authentication after the SSL handshake.
The PlainSaslFlowController is a handler that facilitates the PLAIN mechanism. For inter-broker communication, sasl.mechanism.inter.broker.protocol=PLAIN is often set, meaning brokers will authenticate to each other using PLAIN over SSL.
A subtle but important point is how SASL mechanisms interact with security protocols. When you set listener.security.protocol.map=SSL:SSL,PLAIN:SASL_PLAIN, you are mapping the listener name (SSL in this case, which is bound to 9093) to a security protocol. Then, you are telling Kafka that for this SSL listener, it should also support SASL_PLAIN authentication. This means the connection starts as SSL, and then the SASL PLAIN handshake occurs. If you were to configure listeners=SASL_SSL://kafka-broker-1:9093, you would be telling Kafka to handle both SSL and SASL negotiation as part of the initial connection setup, which is a slightly different flow. For PLAIN, the SSL:SSL,PLAIN:SASL_PLAIN mapping is generally preferred because PLAIN credentials are sent after the SSL tunnel is established, ensuring they are encrypted.
The next concept to explore is how to manage user and access control lists (ACLs) for these authenticated users.