Перейти к основному содержимому

Настройка Kafka SSL с использованием Spring Boot

· 7 мин. чтения

1. Введение

В этом руководстве мы рассмотрим базовую настройку подключения клиента Spring Boot к брокеру Apache Kafka с использованием аутентификации SSL.

Secure Sockets Layer (SSL) фактически устарел и был заменен Transport Layer Security (TLS) с 2015 года. Однако по историческим причинам Kafka (и Java) по-прежнему ссылаются на «SSL», и мы будем следовать этому соглашению в этой статье. также.

2. Обзор SSL

По умолчанию Apache Kafka отправляет все данные в виде открытого текста и без какой-либо аутентификации.

Прежде всего, мы можем настроить SSL для шифрования между брокером и клиентом. По умолчанию для этого требуется односторонняя аутентификация с использованием шифрования с открытым ключом, когда клиент аутентифицирует сертификат сервера .

Кроме того, сервер также может аутентифицировать клиента с помощью отдельного механизма (например, SSL или SASL), что позволяет использовать двустороннюю аутентификацию или взаимный TLS (mTLS). По сути, двусторонняя аутентификация SSL гарантирует, что и клиент, и сервер используют SSL-сертификаты для проверки подлинности друг друга и доверяют друг другу в обоих направлениях .

В этой статье брокер будет использовать SSL для аутентификации клиента , а хранилище ключей и хранилище доверенных сертификатов будут использоваться для хранения сертификатов и ключей.

Каждому брокеру требуется собственное хранилище ключей, содержащее закрытый ключ и открытый сертификат. Клиент использует свое хранилище доверенных сертификатов для аутентификации этого сертификата и доверия серверу. Точно так же каждому клиенту также требуется собственное хранилище ключей, которое содержит его закрытый ключ и открытый сертификат. Сервер использует свое хранилище доверенных сертификатов для аутентификации и доверия сертификату клиента, а также для установления безопасного соединения.

Хранилище доверенных сертификатов может содержать центр сертификации (ЦС), который может подписывать сертификаты . В этом случае брокер или клиент доверяет любому сертификату, подписанному ЦС, который присутствует в хранилище доверенных сертификатов . Это упрощает аутентификацию сертификата, поскольку добавление новых клиентов или брокеров не требует внесения изменений в хранилище доверенных сертификатов.

3. Зависимости и установка

Наш пример приложения будет простым приложением Spring Boot.

Чтобы подключиться к Kafka, добавим зависимость spring-kafka в наш файл POM:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>

Мы также будем использовать файл Docker Compose для настройки и тестирования установки сервера Kafka. Сначала сделаем это без какой-либо настройки SSL:

---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Теперь запустим контейнер:

docker-compose up

Это должно открыть брокер с конфигурацией по умолчанию.

4. Конфигурация брокера

Давайте начнем с рассмотрения минимальной конфигурации, необходимой брокеру для установки безопасных соединений.

4.1. Автономный брокер

Хотя в этом примере мы не используем автономный экземпляр брокера, полезно знать изменения конфигурации, необходимые для включения проверки подлинности SSL.

Во- первых, нам нужно настроить брокера для прослушивания SSL-соединений на порту 9093 в server.properties :

listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

Затем необходимо настроить свойства, связанные с хранилищем ключей и хранилищем доверенных сертификатов , с указанием местоположений сертификатов и учетных данных:

ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password

Наконец, брокер должен быть настроен для аутентификации клиентов , чтобы обеспечить двустороннюю аутентификацию:

ssl.client.auth=required

4.2. Докер Сочинять

Поскольку мы используем Compose для управления нашей средой брокера, давайте добавим все вышеперечисленные свойства в наш файл docker-compose.yml :

kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
...
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
volumes:
- ./certs/:/etc/kafka/secrets/certs

Здесь мы указали порт SSL (9093) в разделе конфигурации портов . Кроме того, мы смонтировали папку проекта certs в разделе томов файла config. Он содержит необходимые сертификаты и соответствующие учетные данные.

Теперь при перезапуске стека с помощью Compose в журнале брокера отображаются соответствующие сведения о SSL:

...
kafka_1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1 | ===> Configuring ...
<strong>kafka_1 | SSL is enabled.</strong>
....
kafka_1 | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1 | advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1 | ssl.client.auth = required</strong>
<strong>kafka_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1 | ssl.endpoint.identification.algorithm = https
kafka_1 | ssl.key.password = [hidden]
kafka_1 | ssl.keymanager.algorithm = SunX509
<strong>kafka_1 | ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1 | ssl.keystore.password = [hidden]
kafka_1 | ssl.keystore.type = JKS
kafka_1 | ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1 | ssl.protocol = TLSv1.3</strong>
kafka_1 | ssl.trustmanager.algorithm = PKIX
kafka_1 | ssl.truststore.certificates = null
<strong>kafka_1 | ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1 | ssl.truststore.password = [hidden]
kafka_1 | ssl.truststore.type = JKS
....

5. Клиент Spring Boot

Теперь, когда настройка сервера завершена, мы создадим необходимые компоненты Spring Boot. Они будут взаимодействовать с нашим брокером, который теперь требует SSL для двусторонней аутентификации.

5.1. Режиссер

Во-первых, давайте отправим сообщение в указанную тему с помощью KafkaTemplate :

public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}

Метод отправки является асинхронной операцией. Поэтому мы прикрепили простой обратный вызов, который просто регистрирует некоторую информацию, как только брокер получает сообщение.

5.2. Потребитель

Далее давайте создадим простого потребителя, используя @KafkaListener . Это подключается к брокеру и использует сообщения из той же темы, что и производитель:

public class KafkaConsumer {

public static final String TOPIC = "test-topic";

public final List<String> messages = new ArrayList<>();

@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
}

В нашем демонстрационном приложении мы упростили задачу, и потребитель просто сохраняет сообщения в списке . В реальной реальной системе потребитель получает сообщения и обрабатывает их в соответствии с бизнес-логикой приложения.

5.3. Конфигурация

Наконец, добавим необходимую конфигурацию в наш application.yml :

spring:
kafka:
security:
protocol: "SSL"
bootstrap-servers: localhost:9093
ssl:
trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
trust-store-password: <password>
key-store-location: classpath:/client-certs/kafka.client.keystore.jks
key-store-password: <password>

# additional config for producer/consumer

Здесь мы установили необходимые свойства, предоставляемые Spring Boot, для настройки производителя и потребителя. Поскольку оба этих компонента подключаются к одному и тому же брокеру, мы можем объявить все основные свойства в разделе spring.kafka . Однако, если бы производитель и потребитель подключались к разным брокерам, мы бы указали их в разделах spring.kafka.producer и spring.kafka.consumer соответственно.

В разделе конфигурации ssl мы указываем на хранилище доверенных сертификатов JKS, чтобы аутентифицировать брокера Kafka . Он содержит сертификат центра сертификации, который также подписал сертификат брокера. Кроме того, мы также предоставили путь к хранилищу ключей клиента Spring, которое содержит сертификат, подписанный ЦС , который должен присутствовать в хранилище доверенных сертификатов на стороне брокера.

5.4. Тестирование

Поскольку мы используем файл Compose, давайте воспользуемся фреймворком Testcontainers для создания сквозного теста с нашим Producer и Consumer :

@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {

private static final String KAFKA_SERVICE = "kafka";
private static final int SSL_PORT = 9093;

@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
.withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());

@Autowired
private KafkaProducer kafkaProducer;

@Autowired
private KafkaConsumer kafkaConsumer;

@Test
void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
String message = generateSampleMessage();
kafkaProducer.sendMessage(message, TOPIC);

await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}

private static String generateSampleMessage() {
return UUID.randomUUID().toString();
}
}

Когда мы запускаем тест, Testcontainers запускает брокера Kafka, используя наш файл Compose, включая конфигурацию SSL. Приложение также запускается со своей конфигурацией SSL и подключается к брокеру через зашифрованное и аутентифицированное соединение. Поскольку это асинхронная последовательность событий, мы использовали Awaitlity для опроса ожидаемого сообщения в хранилище сообщений потребителя. Это проверяет всю конфигурацию и успешную двустороннюю аутентификацию между брокером и клиентом.

6. Заключение

В этой статье мы рассмотрели основы настройки SSL-аутентификации, необходимой между брокером Kafka и клиентом Spring Boot.

Сначала мы рассмотрели настройку брокера, необходимую для включения двусторонней аутентификации. Затем мы рассмотрели конфигурацию, необходимую на стороне клиента для подключения к брокеру через зашифрованное и аутентифицированное соединение. Наконец, мы использовали интеграционный тест для проверки безопасного соединения между брокером и клиентом.

Как всегда, полный исходный код доступен на GitHub .