Перейти к основному содержимому

Kafka Streams с Spring Boot

· 7 мин. чтения

1. Введение

В этой статье мы увидим, как настроить Kafka Streams с помощью Spring Boot. Kafka Streams — это клиентская библиотека, созданная поверх Apache Kafka. ^ Он позволяет обрабатывать неограниченный поток событий декларативным образом.

Некоторыми реальными примерами потоковой передачи данных могут быть данные датчиков, потоки событий фондового рынка и системные журналы. В этом уроке мы создадим простое приложение для потоковой передачи подсчета слов. Давайте начнем с обзора Kafka Streams, а затем настроим пример вместе с его тестами в Spring Boot.

2. Обзор

Kafka Streams обеспечивает двойственность между темами Kafka и таблицами реляционной базы данных. Это позволяет нам выполнять такие операции, как объединение, группировка, агрегирование и фильтрация одного или нескольких потоковых событий.

Важной концепцией Kafka Streams является топология процессора. Топология процессора — это план операций Kafka Stream с одним или несколькими потоками событий . По существу, топологию процессора можно рассматривать как ориентированный ациклический граф . На этом графике узлы подразделяются на узлы источника, процессора и приемника, тогда как ребра представляют поток событий потока.

Источник в верхней части топологии получает потоковые данные от Kafka, передает их процессорным узлам, где выполняются пользовательские операции, и передает их через узлы-приемники в новую тему Kafka. Наряду с основной обработкой состояние потока периодически сохраняется с использованием контрольных точек для обеспечения отказоустойчивости и отказоустойчивости.

3. Зависимости

Мы начнем с добавления зависимостей spring-kafka и kafka-streams в наш POM:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId
<artifactId>kafka-streams</artifactId>
<version>2.7.1</version>
</dependency>

4. Пример

Наш пример приложения считывает потоковые события из входной темы Kafka. Как только записи прочитаны, он обрабатывает их, разделяя текст и подсчитывая отдельные слова. Впоследствии он отправляет обновленное количество слов на выход Kafka. В дополнение к теме вывода мы также создадим простую службу REST для предоставления этого счетчика через конечную точку HTTP.

В целом, выходная тема будет постоянно обновляться за счет слов, извлеченных из входных событий, и их обновленного количества.

4.1. Конфигурация

Во-первых, давайте определим конфигурацию потока Kafka в классе конфигурации Java:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "streams-app");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

return new KafkaStreamsConfiguration(props);
}

// other config
}

Здесь мы использовали аннотацию @EnableKafkaStreams для автоматической настройки необходимых компонентов. Для этой автоматической настройки требуется bean-компонент KafkaStreamsConfiguration с именем, указанным в DEFAULT_STREAMS_CONFIG_BEAN_NAME . В результате Spring Boot использует эту конфигурацию и создает клиент KafkaStreams для управления жизненным циклом нашего приложения .

В нашем примере мы предоставили идентификатор приложения, сведения о подключении к серверу начальной загрузки и SerDes (сериализатор/десериализатор) для нашей конфигурации.

4.2. Топология

Теперь, когда мы настроили конфигурацию, давайте создадим топологию для нашего приложения, чтобы вести подсчет слов во входных сообщениях:

@Component
public class WordCountProcessor {

private static final Serde<String> STRING_SERDE = Serdes.String();

@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count();

wordCounts.toStream().to("output-topic");
}
}

Здесь мы определили метод конфигурации и аннотировали его с помощью @Autowired . Spring обрабатывает эту аннотацию и связывает соответствующий bean-компонент из контейнера с аргументом StreamsBuilder . В качестве альтернативы мы также можем создать bean-компонент в классе конфигурации для создания топологии.

StreamsBuilder дает нам доступ ко всем API Kafka Streams и становится похожим на обычное приложение Kafka Streams. В нашем примере мы использовали этот высокоуровневый DSL для определения преобразований для нашего приложения:

  • Создайте KStream из входной темы, используя указанный ключ и значение SerDes.
  • Создайте KTable , преобразовав, разделив, сгруппировав и подсчитав данные.
  • Материализуйте результат в выходной поток.

По сути, Spring Boot обеспечивает очень тонкую оболочку вокруг Streams API, управляя жизненным циклом нашего экземпляра KStream . Он создает и настраивает необходимые компоненты для топологии и запускает наше приложение Streams. Важно отметить, что это позволяет нам сосредоточиться на нашей основной бизнес-логике, в то время как Spring управляет жизненным циклом.

4.3. РЕСТ-сервис

После определения нашего конвейера с декларативными шагами давайте создадим контроллер REST. Это обеспечивает конечные точки для отправки сообщений POST во входную тему и получения количества для указанного слова. Но что важно, приложение извлекает данные из хранилища состояний Kafka Streams, а не из топика вывода .

Во-первых, давайте изменим KTable из предыдущей и материализуем агрегированные счетчики как локальное хранилище состояний. Затем это можно запросить у контроллера REST:

KTable<String, Long> wordCounts = textStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("counts"));

После этого мы можем обновить наш контроллер, чтобы получить значение счетчика из этого хранилища состояний счетчиков :

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
);
return counts.get(word);
}

Здесь factoryBean — это экземпляр StreamsBuilderFactoryBean , подключенный к контроллеру. Это обеспечивает экземпляр KafkaStreams , управляемый этим фабричным компонентом. Таким образом, мы можем получить созданное ранее хранилище состояний ключей/значений counts , представленное KTable . На этом этапе мы можем использовать это, чтобы получить текущее количество запрошенных слов из локального хранилища состояний.

5. Тестирование

Тестирование является важной частью разработки и проверки топологии нашего приложения. Библиотека тестов Spring Kafka и контейнеры Testcontainers обеспечивают отличную поддержку для тестирования нашего приложения на различных уровнях.

5.1. Модульное тестирование

Во-первых, давайте настроим модульный тест для нашей топологии, используя TopologyTestDriver . Это основной тестовый инструмент для тестирования приложения Kafka Streams:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
wordCountProcessor.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();

try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
TestInputTopic<String, String> inputTopic = topologyTestDriver
.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());

TestOutputTopic<String, Long> outputTopic = topologyTestDriver
.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

inputTopic.pipeInput("key", "hello world");
inputTopic.pipeInput("key2", "hello");

assertThat(outputTopic.readKeyValuesToList())
.containsExactly(
KeyValue.pair("hello", 1L),
KeyValue.pair("world", 1L),
KeyValue.pair("hello", 2L)
);
}
}

Здесь первое, что нам нужно, — это топология , которая инкапсулирует нашу тестируемую бизнес-логику из WordCountProcessor . Теперь мы можем использовать это с TopologyTestDriver для создания входных и выходных тем для нашего тестирования. Важно отметить, что это устраняет необходимость запуска брокера и проверки поведения конвейера . Другими словами, это позволяет быстро и легко проверить поведение нашего конвейера без использования настоящего брокера Kafka.

5.2. Интеграционное тестирование

Наконец, давайте воспользуемся фреймворком Testcontainers для сквозного тестирования нашего приложения. Это использует работающий брокер Kafka и запускает наше приложение для полного теста:

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

@Container
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

// other test setup

@Test
void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
postMessage("test message");

startOutputTopicConsumer();

// assert correct counts on output topic
assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

// assert correct count from REST service
assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
}
}

Здесь мы отправили POST нашему контроллеру REST, который, в свою очередь, отправляет сообщение во входную тему Kafka. В рамках установки мы также запустили потребителя Kafka. Это асинхронно прослушивает выходную тему Kafka и обновляет BlockingQueue с полученным количеством слов.

Во время выполнения теста приложение должно обрабатывать входные сообщения. Далее мы можем проверить ожидаемый вывод как из темы, так и из хранилища состояний с помощью службы REST.

6. Заключение

В этом руководстве мы увидели, как создать простое приложение, управляемое событиями, для обработки сообщений с помощью Kafka Streams и Spring Boot.

После краткого обзора основных концепций потоковой передачи мы рассмотрели настройку и создание топологии потоков. Затем мы увидели, как интегрировать это с функциональностью REST, предоставляемой Spring Boot. Наконец, мы рассмотрели некоторые подходы к эффективному тестированию и проверке нашей топологии и поведения приложений.

Как всегда, полный исходный код доступен на GitHub .