1. Обзор
В этом руководстве мы кратко представим Apache Kafka , а затем посмотрим, как программно создавать и настраивать разделы в кластере Kafka.
2. Знакомство с Кафкой
Apache Kafka — это мощная, высокопроизводительная распределенная платформа для потоковой передачи событий.
Как правило, приложения-производители публикуют события в Kafka, а потребители подписываются на эти события, чтобы читать и обрабатывать их. Kafka использует темы для хранения и классификации этих событий, например, в приложении электронной коммерции может быть тема «заказы».
Темы Kafka разделены, что позволяет распределять данные между несколькими брокерами для масштабируемости. Их можно реплицировать, чтобы сделать данные отказоустойчивыми и высокодоступными. Темы также сохраняют события даже после использования столько времени, сколько требуется. Все это управляется для каждой темы с помощью инструментов командной строки Kafka и конфигураций ключ-значение .
Однако в дополнение к инструментам командной строки Kafka также предоставляет Admin API для управления темами, брокерами и другими объектами Kafka и их проверки . В нашем примере мы будем использовать этот API для создания новых тем.
3. Зависимости
Чтобы использовать Admin API, давайте добавим зависимость kafka-clients в наш pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
4. Настройка Кафки
Прежде чем создавать новые темы, нам нужен хотя бы одноузловой кластер Kafka.
В этом руководстве мы будем использовать платформу Testcontainers для создания экземпляра контейнера Kafka. Затем мы можем запустить надежные и автономные интеграционные тесты, которые не полагаются на работающий внешний сервер Kafka. Для этого нам понадобятся еще две зависимости специально для наших тестов.
Во-первых, давайте добавим зависимость Testcontainers Kafka к нашему pom.xml
:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
Далее мы добавим артефакт junit-jupiter для запуска тестов Testcontainer с использованием JUnit 5:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
Теперь, когда у нас настроены все необходимые зависимости, мы можем написать простое приложение для программного создания новых тем.
5. Административный API
Начнем с создания нового экземпляра Properties
с минимальной конфигурацией для локального брокера:
Properties properties = new Properties();
properties.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);
Теперь мы можем получить экземпляр Admin :
Admin admin = Admin.create(properties)
Метод create
принимает объект Properties
(или Map)
со свойством bootstrap.servers
и возвращает потокобезопасный экземпляр.
Клиент администрирования использует это свойство для обнаружения брокеров в кластере и последующего выполнения любых операций администрирования. Таким образом, обычно достаточно указать два или три адреса брокера, чтобы покрыть возможность того, что некоторые экземпляры будут недоступны.
Класс AdminClientConfig
содержит константы для всех записей конфигурации клиента администратора .
6. Создание темы
Давайте начнем с создания теста JUnit 5 с Testcontainers , чтобы проверить успешное создание темы. Мы будем использовать модуль Kafka , который использует официальный образ Kafka Docker для платформы Confluent OSS :
@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
kafkaTopicApplication.createTopic("test-topic");
String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
.getStdout();
assertThat(stdout).contains("test-topic");
}
Здесь Testcontainers будут автоматически создавать экземпляр контейнера Kafka и управлять им во время выполнения теста. Мы просто вызываем код нашего приложения и проверяем, успешно ли создана тема в работающем контейнере.
6.1. Создать с параметрами по умолчанию
Разделы тем и коэффициент репликации являются ключевыми соображениями для новых тем. Мы не будем усложнять и создадим наш пример темы с 1 разделом и коэффициентом репликации, равным 1:
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
KafkaFuture<Void> future = result.values().get(topicName);
future.get();
}
Здесь мы использовали Admin. createTopics
для создания пакета новых тем с параметрами по умолчанию. Поскольку интерфейс администратора
расширяет интерфейс AutoCloseable
, мы использовали попытку с ресурсами для выполнения нашей операции. Это гарантирует, что ресурсы высвобождаются надлежащим образом.
Важно отметить, что этот метод взаимодействует с посредником контроллера и выполняется асинхронно. Возвращенный объект CreateTopicsResult предоставляет
KafkaFuture
для доступа к результатам каждого элемента в пакете запросов. Это соответствует шаблону асинхронного программирования Java и позволяет вызывающим объектам получать результаты операции с помощью метода Future.get
.
Для синхронного поведения мы можем вызвать этот метод немедленно, чтобы получить результат нашей операции. Это блокирует до тех пор, пока операция не завершится или не завершится ошибкой. В случае сбоя это приводит к исключению ExecutionException, которое оборачивает основную причину.
6.2. Создать с параметрами
Вместо параметров по умолчанию мы также можем использовать перегруженную форму файла Admin. createTopics
и предоставить некоторые параметры через объект CreateTopicsOptions
. Мы можем использовать их для изменения поведения клиента администратора при создании новых тем:
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
.validateOnly(true)
.retryOnQuotaViolation(false);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
);
Здесь мы установили для параметра validateOnly
значение true, что означает, что клиент будет только проверять без фактического создания темы. Точно так же для параметра retryOnQuotaViolation
установлено значение false, чтобы операция не повторялась в случае нарушения квоты.
6.3. Конфигурация новой темы
Kafka имеет широкий спектр конфигураций тем, которые контролируют поведение тем, например, сохранение и сжатие данных и т. д. Они имеют как значение по умолчанию для сервера, так и необязательное переопределение для каждой темы.
Мы можем предоставить конфигурации темы, используя карту конфигурации для новой темы :
// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(newTopicConfig);
Класс TopicConfig
из Admin API содержит ключи, которые можно использовать для настройки тем во время создания.
7. Другие тематические операции
Помимо возможности создавать новые темы, Admin API также имеет операции по удалению , перечислению и описанию тем . Все эти операции, связанные с темой, следуют той же схеме, что и при создании темы.
У каждого из этих методов операции есть перегруженная версия, принимающая в качестве входных данных объект xxxTopicOptions .
Все эти методы возвращают соответствующий объект xxxTopicsResult
. Это, в свою очередь, предоставляет KafkaFuture
доступ к результатам асинхронной операции.
Наконец, также стоит упомянуть, что с момента его появления в Kafka версии 0.11.0.0 API администратора все еще развивается, как указано в аннотации InterfaceStability.Evolving .
Это означает, что API может измениться в будущем, а второстепенная версия может нарушить совместимость.
8. Заключение
В этом руководстве мы увидели, как создать новую тему в Kafka с помощью клиента администрирования Java.
Изначально мы создали тему с параметрами по умолчанию, а затем с явными параметрами. После этого мы увидели, как настроить новую тему, используя различные свойства. Наконец, мы кратко рассмотрели другие связанные с темой операции с использованием клиента администратора.
Попутно мы также увидели, как использовать Testcontainers для настройки простого кластера с одним узлом из наших тестов.
Как всегда, полный исходный код статьи доступен на GitHub .