Перейти к основному содержимому

Введение в KafkaStreams в Java

· 5 мин. чтения

1. Обзор

В этой статье мы рассмотрим библиотеку KafkaStreams .

KafkaStreams разработан создателями Apache Kafka . Основная цель этой части программного обеспечения — позволить программистам создавать эффективные потоковые приложения в режиме реального времени, которые могут работать как микросервисы.

KafkaStreams позволяет нам получать из тем Kafka, анализировать или преобразовывать данные и, возможно, отправлять их в другую тему Kafka.

Чтобы продемонстрировать KafkaStreams, мы создадим простое приложение, которое читает предложения из темы, подсчитывает вхождения слов и печатает количество слов.

Важно отметить, что библиотека KafkaStreams не является реактивной и не поддерживает асинхронные операции и обработку обратного давления.

2. Зависимость от Maven

Чтобы начать писать логику обработки Stream с помощью KafkaStreams, нам нужно добавить зависимость к kafka-streams и kafka-clients :

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

Нам также необходимо установить и запустить Apache Kafka, потому что мы будем использовать тему Kafka. Эта тема будет источником данных для нашей потоковой работы.

Мы можем скачать Kafka и другие необходимые зависимости с официального сайта .

3. Настройка ввода KafkaStreams

Первое, что мы сделаем, это определение входной темы Kafka.

Мы можем использовать скачанный нами инструмент Confluent — он содержит сервер Kafka. Он также содержит kafka-console-producer , который мы можем использовать для публикации сообщений в Kafka.

Для начала запустим наш кластер Kafka:

./confluent start

После запуска Kafka мы можем определить наш источник данных и имя нашего приложения, используя APPLICATION_ID_CONFIG :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-live-test");

Важным параметром конфигурации является файл BOOTSTRAP_SERVER_CONFIG. Это URL-адрес нашего локального экземпляра Kafka, который мы только что запустили:

private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);

Далее нам нужно передать тип ключа и значение сообщений, которые будут потребляться из inputTopic:

streamsConfiguration.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

Потоковая обработка часто имеет состояние. Когда мы хотим сохранить промежуточные результаты, нам нужно указать параметр STATE_DIR_CONFIG .

В нашем тесте мы используем локальную файловую систему:

this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());

4. Построение потоковой топологии

Как только мы определили нашу входную тему, мы можем создать потоковую топологию — это определение того, как события должны обрабатываться и преобразовываться.

В нашем примере мы хотели бы реализовать счетчик слов. Для каждого предложения, отправленного в inputTopic, мы хотим разбить его на слова и вычислить вхождение каждого слова.

Мы можем использовать экземпляр класса KStreamsBuilder , чтобы начать построение нашей топологии:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();

Чтобы реализовать подсчет слов, во-первых, нам нужно разделить значения с помощью регулярного выражения.

Метод split возвращает массив. Мы используем flatMapValues() , чтобы сгладить его. В противном случае мы получили бы список массивов, и писать код с такой структурой было бы неудобно.

Наконец, мы агрегируем значения для каждого слова и вызываем функцию count() , которая будет подсчитывать количество вхождений определенного слова.

5. Обработка результатов

Мы уже подсчитали количество слов в наших входных сообщениях. Теперь выведем результаты на стандартный вывод с помощью метода foreach() :

wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));

В рабочей среде часто такое потоковое задание может публиковать выходные данные в другой теме Kafka.

Мы могли бы сделать это с помощью метода to():

String outputTopic = "outputTopic";
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

Класс Serde предоставляет нам предварительно сконфигурированные сериализаторы для типов Java, которые будут использоваться для сериализации объектов в массив байтов. Затем массив байтов будет отправлен в топик Kafka.

Мы используем String как ключ к нашей теме и Long как значение для фактического количества. Метод to() сохранит полученные данные в outputTopic .

6. Запуск задания KafkaStream

До этого момента мы построили топологию, которую можно выполнить. Однако работа еще не началась.

Нам нужно явно запустить нашу работу, вызвав метод start() для экземпляра KafkaStreams :

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();

Thread.sleep(30000);
streams.close();

Обратите внимание, что мы ждем 30 секунд для завершения задания. В реальном сценарии это задание будет выполняться все время, обрабатывая события от Kafka по мере их поступления.

Мы можем проверить нашу работу, опубликовав некоторые события в нашей теме Kafka.

Давайте запустим kafka-console-producer и вручную отправим некоторые события в наш inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"

Таким образом, мы опубликовали два события в Kafka. Наше приложение будет использовать эти события и выведет следующий вывод:

word:  -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word: -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2

Мы видим, что когда пришло первое сообщение, слово пони встретилось только один раз. Но когда мы отправили второе сообщение, слово пони произошло во второй раз печати: « слово: пони -> 2″ .

6. Заключение

В этой статье обсуждается, как создать основное приложение для обработки потоков, используя Apache Kafka в качестве источника данных и библиотеку KafkaStreams в качестве библиотеки обработки потоков.

Все эти примеры и фрагменты кода можно найти в проекте GitHub — это проект Maven, поэтому его легко импортировать и запускать как есть.