1. Введение
В этой статье мы увидим, как настроить Kafka Streams с помощью Spring Boot. Kafka Streams — это клиентская библиотека, созданная поверх Apache Kafka. ^ Он позволяет обрабатывать неограниченный поток событий декларативным образом.
Некоторыми реальными примерами потоковой передачи данных могут быть данные датчиков, потоки событий фондового рынка и системные журналы. В этом уроке мы создадим простое приложение для потоковой передачи подсчета слов. Давайте начнем с обзора Kafka Streams, а затем настроим пример вместе с его тестами в Spring Boot.
2. Обзор
Kafka Streams обеспечивает двойственность между темами Kafka и таблицами реляционной базы данных. Это позволяет нам выполнять такие операции, как объединение, группировка, агрегирование и фильтрация одного или нескольких потоковых событий.
Важной концепцией Kafka Streams является топология процессора. Топология процессора — это план операций Kafka Stream с одним или несколькими потоками событий . По существу, топологию процессора можно рассматривать как ориентированный ациклический граф . На этом графике узлы подразделяются на узлы источника, процессора и приемника, тогда как ребра представляют поток событий потока.
Источник в верхней части топологии получает потоковые данные от Kafka, передает их процессорным узлам, где выполняются пользовательские операции, и передает их через узлы-приемники в новую тему Kafka. Наряду с основной обработкой состояние потока периодически сохраняется с использованием контрольных точек для обеспечения отказоустойчивости и отказоустойчивости.
3. Зависимости
Мы начнем с добавления зависимостей spring-kafka
и kafka-streams
в наш POM:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId
<artifactId>kafka-streams</artifactId>
<version>2.7.1</version>
</dependency>
4. Пример
Наш пример приложения считывает потоковые события из входной темы Kafka. Как только записи прочитаны, он обрабатывает их, разделяя текст и подсчитывая отдельные слова. Впоследствии он отправляет обновленное количество слов на выход Kafka. В дополнение к теме вывода мы также создадим простую службу REST для предоставления этого счетчика через конечную точку HTTP.
В целом, выходная тема будет постоянно обновляться за счет слов, извлеченных из входных событий, и их обновленного количества.
4.1. Конфигурация
Во-первых, давайте определим конфигурацию потока Kafka в классе конфигурации Java:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "streams-app");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
// other config
}
Здесь мы использовали аннотацию @EnableKafkaStreams
для автоматической настройки необходимых компонентов. Для этой автоматической настройки требуется bean-компонент KafkaStreamsConfiguration
с именем, указанным в DEFAULT_STREAMS_CONFIG_BEAN_NAME
. В результате Spring Boot использует эту конфигурацию и создает клиент KafkaStreams
для управления жизненным циклом нашего приложения .
В нашем примере мы предоставили идентификатор приложения, сведения о подключении к серверу начальной загрузки и SerDes (сериализатор/десериализатор) для нашей конфигурации.
4.2. Топология
Теперь, когда мы настроили конфигурацию, давайте создадим топологию для нашего приложения, чтобы вести подсчет слов во входных сообщениях:
@Component
public class WordCountProcessor {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count();
wordCounts.toStream().to("output-topic");
}
}
Здесь мы определили метод конфигурации и аннотировали его с помощью @Autowired
. Spring обрабатывает эту аннотацию и связывает соответствующий bean-компонент из контейнера с аргументом StreamsBuilder
. В качестве альтернативы мы также можем создать bean-компонент в классе конфигурации для создания топологии.
StreamsBuilder
дает нам доступ ко всем API Kafka Streams и становится похожим на обычное приложение Kafka Streams. В нашем примере мы использовали этот высокоуровневый DSL для определения преобразований для нашего приложения:
- Создайте
KStream
из входной темы, используя указанный ключ и значение SerDes. - Создайте
KTable
, преобразовав, разделив, сгруппировав и подсчитав данные. - Материализуйте результат в выходной поток.
По сути, Spring Boot обеспечивает очень тонкую оболочку вокруг Streams API, управляя жизненным циклом нашего экземпляра KStream
. Он создает и настраивает необходимые компоненты для топологии и запускает наше приложение Streams. Важно отметить, что это позволяет нам сосредоточиться на нашей основной бизнес-логике, в то время как Spring управляет жизненным циклом.
4.3. РЕСТ-сервис
После определения нашего конвейера с декларативными шагами давайте создадим контроллер REST. Это обеспечивает конечные точки для отправки сообщений POST во входную тему и получения количества для указанного слова. Но что важно, приложение извлекает данные из хранилища состояний Kafka Streams, а не из топика вывода .
Во-первых, давайте изменим KTable
из предыдущей и материализуем агрегированные счетчики как локальное хранилище состояний. Затем это можно запросить у контроллера REST:
KTable<String, Long> wordCounts = textStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("counts"));
После этого мы можем обновить наш контроллер, чтобы получить значение счетчика из этого хранилища состояний счетчиков :
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
);
return counts.get(word);
}
Здесь factoryBean
— это экземпляр StreamsBuilderFactoryBean
, подключенный к контроллеру. Это обеспечивает экземпляр KafkaStreams
, управляемый этим фабричным компонентом. Таким образом, мы можем получить созданное ранее хранилище состояний ключей/значений counts
, представленное KTable
. На этом этапе мы можем использовать это, чтобы получить текущее количество запрошенных слов из локального хранилища состояний.
5. Тестирование
Тестирование является важной частью разработки и проверки топологии нашего приложения. Библиотека тестов Spring Kafka и контейнеры Testcontainers обеспечивают отличную поддержку для тестирования нашего приложения на различных уровнях.
5.1. Модульное тестирование
Во-первых, давайте настроим модульный тест для нашей топологии, используя TopologyTestDriver
. Это основной тестовый инструмент для тестирования приложения Kafka Streams:
@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
wordCountProcessor.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();
try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
TestInputTopic<String, String> inputTopic = topologyTestDriver
.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, Long> outputTopic = topologyTestDriver
.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());
inputTopic.pipeInput("key", "hello world");
inputTopic.pipeInput("key2", "hello");
assertThat(outputTopic.readKeyValuesToList())
.containsExactly(
KeyValue.pair("hello", 1L),
KeyValue.pair("world", 1L),
KeyValue.pair("hello", 2L)
);
}
}
Здесь первое, что нам нужно, — это топология
, которая инкапсулирует нашу тестируемую бизнес-логику из WordCountProcessor
. Теперь мы можем использовать это с TopologyTestDriver
для создания входных и выходных тем для нашего тестирования. Важно отметить, что это устраняет необходимость запуска брокера и проверки поведения конвейера . Другими словами, это позволяет быстро и легко проверить поведение нашего конвейера без использования настоящего брокера Kafka.
5.2. Интеграционное тестирование
Наконец, давайте воспользуемся фреймворком Testcontainers для сквозного тестирования нашего приложения. Это использует работающий брокер Kafka и запускает наше приложение для полного теста:
@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
private final BlockingQueue<String> output = new LinkedBlockingQueue<>();
// other test setup
@Test
void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
postMessage("test message");
startOutputTopicConsumer();
// assert correct counts on output topic
assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");
// assert correct count from REST service
assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
}
}
Здесь мы отправили POST нашему контроллеру REST, который, в свою очередь, отправляет сообщение во входную тему Kafka. В рамках установки мы также запустили потребителя Kafka. Это асинхронно прослушивает выходную тему Kafka и обновляет BlockingQueue
с полученным количеством слов.
Во время выполнения теста приложение должно обрабатывать входные сообщения. Далее мы можем проверить ожидаемый вывод как из темы, так и из хранилища состояний с помощью службы REST.
6. Заключение
В этом руководстве мы увидели, как создать простое приложение, управляемое событиями, для обработки сообщений с помощью Kafka Streams и Spring Boot.
После краткого обзора основных концепций потоковой передачи мы рассмотрели настройку и создание топологии потоков. Затем мы увидели, как интегрировать это с функциональностью REST, предоставляемой Spring Boot. Наконец, мы рассмотрели некоторые подходы к эффективному тестированию и проверке нашей топологии и поведения приложений.
Как всегда, полный исходный код доступен на GitHub .