1. Введение
В этом руководстве мы рассмотрим базовую настройку подключения клиента Spring Boot к брокеру Apache Kafka с использованием аутентификации SSL.
Secure Sockets Layer (SSL) фактически устарел и был заменен Transport Layer Security (TLS) с 2015 года. Однако по историческим причинам Kafka (и Java) по-прежнему ссылаются на «SSL», и мы будем следовать этому соглашению в этой статье. также.
2. Обзор SSL
По умолчанию Apache Kafka отправляет все данные в виде открытого текста и без какой-либо аутентификации.
Прежде всего, мы можем настроить SSL для шифрования между брокером и клиентом. По умолчанию для этого требуется односторонняя аутентификация с использованием шифрования с открытым ключом, когда клиент аутентифицирует сертификат сервера .
Кроме того, сервер также может аутентифицировать клиента с помощью отдельного механизма (например, SSL или SASL), что позволяет использовать двустороннюю аутентификацию или взаимный TLS (mTLS). По сути, двусторонняя аутентификация SSL гарантирует, что и клиент, и сервер используют SSL-сертификаты для проверки подлинности друг друга и доверяют друг другу в обоих направлениях .
В этой статье брокер будет использовать SSL для аутентификации клиента , а хранилище ключей и хранилище доверенных сертификатов будут использоваться для хранения сертификатов и ключей.
Каждому брокеру требуется собственное хранилище ключей, содержащее закрытый ключ и открытый сертификат. Клиент использует свое хранилище доверенных сертификатов для аутентификации этого сертификата и доверия серверу. Точно так же каждому клиенту также требуется собственное хранилище ключей, которое содержит его закрытый ключ и открытый сертификат. Сервер использует свое хранилище доверенных сертификатов для аутентификации и доверия сертификату клиента, а также для установления безопасного соединения.
Хранилище доверенных сертификатов может содержать центр сертификации (ЦС), который может подписывать сертификаты . В этом случае брокер или клиент доверяет любому сертификату, подписанному ЦС, который присутствует в хранилище доверенных сертификатов . Это упрощает аутентификацию сертификата, поскольку добавление новых клиентов или брокеров не требует внесения изменений в хранилище доверенных сертификатов.
3. Зависимости и установка
Наш пример приложения будет простым приложением Spring Boot.
Чтобы подключиться к Kafka, добавим зависимость spring-kafka
в наш файл POM:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
Мы также будем использовать файл Docker Compose для настройки и тестирования установки сервера Kafka. Сначала сделаем это без какой-либо настройки SSL:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Теперь запустим контейнер:
docker-compose up
Это должно открыть брокер с конфигурацией по умолчанию.
4. Конфигурация брокера
Давайте начнем с рассмотрения минимальной конфигурации, необходимой брокеру для установки безопасных соединений.
4.1. Автономный брокер
Хотя в этом примере мы не используем автономный экземпляр брокера, полезно знать изменения конфигурации, необходимые для включения проверки подлинности SSL.
Во- первых, нам нужно настроить брокера для прослушивания SSL-соединений на порту 9093 в server.properties
:
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
Затем необходимо настроить свойства, связанные с хранилищем ключей и хранилищем доверенных сертификатов , с указанием местоположений сертификатов и учетных данных:
ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password
Наконец, брокер должен быть настроен для аутентификации клиентов , чтобы обеспечить двустороннюю аутентификацию:
ssl.client.auth=required
4.2. Докер Сочинять
Поскольку мы используем Compose для управления нашей средой брокера, давайте добавим все вышеперечисленные свойства в наш файл docker-compose.yml
:
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
...
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
volumes:
- ./certs/:/etc/kafka/secrets/certs
Здесь мы указали порт SSL (9093) в разделе конфигурации портов .
Кроме того, мы смонтировали папку проекта certs в разделе
томов
файла config. Он содержит необходимые сертификаты и соответствующие учетные данные.
Теперь при перезапуске стека с помощью Compose в журнале брокера отображаются соответствующие сведения о SSL:
...
kafka_1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1 | ===> Configuring ...
<strong>kafka_1 | SSL is enabled.</strong>
....
kafka_1 | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1 | advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1 | ssl.client.auth = required</strong>
<strong>kafka_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1 | ssl.endpoint.identification.algorithm = https
kafka_1 | ssl.key.password = [hidden]
kafka_1 | ssl.keymanager.algorithm = SunX509
<strong>kafka_1 | ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1 | ssl.keystore.password = [hidden]
kafka_1 | ssl.keystore.type = JKS
kafka_1 | ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1 | ssl.protocol = TLSv1.3</strong>
kafka_1 | ssl.trustmanager.algorithm = PKIX
kafka_1 | ssl.truststore.certificates = null
<strong>kafka_1 | ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1 | ssl.truststore.password = [hidden]
kafka_1 | ssl.truststore.type = JKS
....
5. Клиент Spring Boot
Теперь, когда настройка сервера завершена, мы создадим необходимые компоненты Spring Boot. Они будут взаимодействовать с нашим брокером, который теперь требует SSL для двусторонней аутентификации.
5.1. Режиссер
Во-первых, давайте отправим сообщение в указанную тему с помощью KafkaTemplate
:
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}
Метод отправки
является асинхронной операцией. Поэтому мы прикрепили простой обратный вызов, который просто регистрирует некоторую информацию, как только брокер получает сообщение.
5.2. Потребитель
Далее давайте создадим простого потребителя, используя @KafkaListener . Это подключается к брокеру и использует сообщения из той же темы, что и производитель:
public class KafkaConsumer {
public static final String TOPIC = "test-topic";
public final List<String> messages = new ArrayList<>();
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
}
В нашем демонстрационном приложении мы упростили задачу, и потребитель просто сохраняет сообщения в списке
. В реальной реальной системе потребитель получает сообщения и обрабатывает их в соответствии с бизнес-логикой приложения.
5.3. Конфигурация
Наконец, добавим необходимую конфигурацию в наш application.yml
:
spring:
kafka:
security:
protocol: "SSL"
bootstrap-servers: localhost:9093
ssl:
trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
trust-store-password: <password>
key-store-location: classpath:/client-certs/kafka.client.keystore.jks
key-store-password: <password>
# additional config for producer/consumer
Здесь мы установили необходимые свойства, предоставляемые Spring Boot, для настройки производителя и потребителя. Поскольку оба этих компонента подключаются к одному и тому же брокеру, мы можем объявить все основные свойства в разделе spring.kafka
. Однако, если бы производитель и потребитель подключались к разным брокерам, мы бы указали их в разделах spring.kafka.producer
и spring.kafka.consumer
соответственно.
В разделе конфигурации ssl мы
указываем на хранилище доверенных сертификатов JKS, чтобы аутентифицировать брокера Kafka . Он содержит сертификат центра сертификации, который также подписал сертификат брокера. Кроме того, мы также предоставили путь к хранилищу ключей клиента Spring, которое содержит сертификат, подписанный ЦС , который должен присутствовать в хранилище доверенных сертификатов на стороне брокера.
5.4. Тестирование
Поскольку мы используем файл Compose, давайте воспользуемся фреймворком Testcontainers для создания сквозного теста с нашим Producer
и Consumer
:
@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {
private static final String KAFKA_SERVICE = "kafka";
private static final int SSL_PORT = 9093;
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
.withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
String message = generateSampleMessage();
kafkaProducer.sendMessage(message, TOPIC);
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
private static String generateSampleMessage() {
return UUID.randomUUID().toString();
}
}
Когда мы запускаем тест, Testcontainers запускает брокера Kafka, используя наш файл Compose, включая конфигурацию SSL. Приложение также запускается со своей конфигурацией SSL и подключается к брокеру через зашифрованное и аутентифицированное соединение. Поскольку это асинхронная последовательность событий, мы использовали Awaitlity для опроса ожидаемого сообщения в хранилище сообщений потребителя. Это проверяет всю конфигурацию и успешную двустороннюю аутентификацию между брокером и клиентом.
6. Заключение
В этой статье мы рассмотрели основы настройки SSL-аутентификации, необходимой между брокером Kafka и клиентом Spring Boot.
Сначала мы рассмотрели настройку брокера, необходимую для включения двусторонней аутентификации. Затем мы рассмотрели конфигурацию, необходимую на стороне клиента для подключения к брокеру через зашифрованное и аутентифицированное соединение. Наконец, мы использовали интеграционный тест для проверки безопасного соединения между брокером и клиентом.
Как всегда, полный исходный код доступен на GitHub .