Перейти к основному содержимому

Введение в Apache Kafka с Spring

· 7 мин. чтения

1. Обзор

Apache Kafka — это распределенная и отказоустойчивая система обработки потоков.

В этом руководстве мы рассмотрим поддержку Spring для Kafka и уровень абстракции, который он обеспечивает по сравнению с собственными клиентскими API Kafka Java.

Spring Kafka предлагает простую и типичную модель программирования шаблонов Spring с KafkaTemplate и управляемыми сообщениями POJO через аннотацию @KafkaListener .

2. Установка и настройка

Чтобы загрузить и установить Kafka, обратитесь к официальному руководству здесь .

Нам также нужно добавить зависимость spring- kafka в наш pom.xml :

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>

Последнюю версию этого артефакта можно найти здесь .

Наш пример приложения будет приложением Spring Boot.

В этой статье предполагается, что сервер запущен с конфигурацией по умолчанию и порты сервера не изменены.

3. Настройка тем

Раньше мы запускали инструменты командной строки для создания тем в Kafka:

$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic mytopic

Но с введением AdminClient в Kafka мы теперь можем создавать темы программно.

Нам нужно добавить bean- компонент KafkaAdmin Spring, который будет автоматически добавлять темы для всех bean-компонентов типа NewTopic :

@Configuration
public class KafkaTopicConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
return new NewTopic("foreach", 1, (short) 1);
}
}

4. Создание сообщений

Чтобы создавать сообщения, нам сначала нужно настроить ProducerFactory . Это задает стратегию создания экземпляров Kafka Producer .

Затем нам нужен KafkaTemplate , который обертывает экземпляр Producer и предоставляет удобные методы для отправки сообщений в темы Kafka.

Экземпляры производителя являются потокобезопасными. Таким образом, использование одного экземпляра в контексте приложения даст более высокую производительность. Следовательно, экземпляры KakfaTemplate также являются потокобезопасными, и рекомендуется использовать один экземпляр.

4.1. Конфигурация производителя

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

4.2. Публикация сообщений

Мы можем отправлять сообщения с помощью класса KafkaTemplate :

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}

API отправки возвращает объект ListenableFuture . Если мы хотим заблокировать отправляющий поток и получить результат об отправленном сообщении, мы можем вызвать get API объекта ListenableFuture . Поток будет ждать результата, но это замедлит производителя.

Kafka — это платформа для быстрой обработки потоков. Поэтому лучше обрабатывать результаты асинхронно, чтобы последующие сообщения не ждали результата предыдущего сообщения.

Мы можем сделать это с помощью обратного вызова:

public void sendMessage(String message) {

ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topicName, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}

5. Использование сообщений

5.1. Конфигурация потребителя

Для использования сообщений нам нужно настроить ConsumerFactory и KafkaListenerContainerFactory . Как только эти bean-компоненты станут доступны в фабрике bean-компонентов Spring, потребители на основе POJO можно настроить с помощью аннотации @KafkaListener .

В классе конфигурации требуется аннотация @EnableKafka , чтобы включить обнаружение аннотации @KafkaListener в bean-компонентах, управляемых Spring :

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

5.2. Использование сообщений

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}

Мы можем реализовать несколько слушателей для темы , каждый с другим идентификатором группы. Кроме того, один потребитель может прослушивать сообщения из разных тем:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring также поддерживает извлечение одного или нескольких заголовков сообщений с помощью аннотации @Header в слушателе:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}

5.3. Использование сообщений из определенного раздела

Обратите внимание, что мы создали тему foreach только с одним разделом.

Однако для темы с несколькими разделами @KafkaListener может явно подписаться на конкретный раздел темы с начальным смещением:

@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}

Поскольку в этом прослушивателе для InitialOffset установлено значение 0, все ранее использованные сообщения из разделов 0 и 3 будут повторно использоваться каждый раз при инициализации этого прослушивателя.

Если нам не нужно устанавливать смещение, мы можем использовать свойство partitions аннотации @TopicPartition , чтобы установить только разделы без смещения:

@KafkaListener(topicPartitions 
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Добавление фильтра сообщений для слушателей

Мы можем настроить прослушиватели для использования определенных типов сообщений, добавив собственный фильтр. Это можно сделать, установив для RecordFilterStrategy значение KafkaListenerContainerFactory :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}

Затем мы можем настроить прослушиватель для использования этой фабрики контейнеров:

@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}

В этом прослушивателе все сообщения, соответствующие фильтру, будут отброшены.

6. Пользовательские конвертеры сообщений

До сих пор мы рассматривали только отправку и получение строк в виде сообщений. Однако мы также можем отправлять и получать пользовательские объекты Java. Для этого требуется настроить соответствующий сериализатор в ProducerFactory и десериализатор в ConsumerFactory .

Давайте посмотрим на простой класс bean-компонента , который мы будем отправлять в виде сообщений:

public class Greeting {

private String msg;
private String name;

// standard getters, setters and constructor
}

6.1. Создание пользовательских сообщений

В этом примере мы будем использовать JsonSerializer .

Давайте посмотрим на код для ProducerFactory и KafkaTemplate :

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}

Мы можем использовать этот новый KafkaTemplate для отправки приветственного сообщения:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Использование пользовательских сообщений

Точно так же давайте изменим ConsumerFactory и KafkaListenerContainerFactory , чтобы правильно десериализовать приветственное сообщение:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}

Сериализатор и десериализатор Spring-kafka JSON использует библиотеку Jackson , которая также является дополнительной зависимостью Maven для проекта spring-kafka.

Итак, давайте добавим его в наш pom.xml :

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>

Вместо использования последней версии Jackson рекомендуется использовать версию, добавленную в pom.xml spring-kafka.

Наконец, нам нужно написать прослушиватель для приема приветственных сообщений:

@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// process greeting message
}

7. Заключение

В этой статье мы рассмотрели основы поддержки Spring для Apache Kafka. Мы кратко рассмотрели классы, используемые для отправки и получения сообщений.

Полный исходный код этой статьи можно найти на GitHub .

Перед запуском кода убедитесь, что сервер Kafka запущен и темы созданы вручную.