1. Обзор
В этой статье мы рассмотрим библиотеку KafkaStreams
.
KafkaStreams
разработан создателями Apache Kafka .
Основная цель этой части программного обеспечения — позволить программистам создавать эффективные потоковые приложения в режиме реального времени, которые могут работать как микросервисы.
KafkaStreams
позволяет нам получать из тем Kafka, анализировать или преобразовывать данные и, возможно, отправлять их в другую тему Kafka.
Чтобы продемонстрировать KafkaStreams,
мы создадим простое приложение, которое читает предложения из темы, подсчитывает вхождения слов и печатает количество слов.
Важно отметить, что библиотека KafkaStreams
не является реактивной и не поддерживает асинхронные операции и обработку обратного давления.
2. Зависимость от Maven
Чтобы начать писать логику обработки Stream с помощью KafkaStreams,
нам нужно добавить зависимость к kafka-streams
и kafka-clients
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
Нам также необходимо установить и запустить Apache Kafka, потому что мы будем использовать тему Kafka. Эта тема будет источником данных для нашей потоковой работы.
Мы можем скачать Kafka и другие необходимые зависимости с официального сайта .
3. Настройка ввода KafkaStreams
Первое, что мы сделаем, это определение входной темы Kafka.
Мы можем использовать скачанный нами инструмент Confluent
— он содержит сервер Kafka. Он также содержит kafka-console-producer
, который мы можем использовать для публикации сообщений в Kafka.
Для начала запустим наш кластер Kafka:
./confluent start
После запуска Kafka мы можем определить наш источник данных и имя нашего приложения, используя APPLICATION_ID_CONFIG
:
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-live-test");
Важным параметром конфигурации является файл BOOTSTRAP_SERVER_CONFIG.
Это URL-адрес нашего локального экземпляра Kafka, который мы только что запустили:
private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
Далее нам нужно передать тип ключа и значение сообщений, которые будут потребляться из inputTopic:
streamsConfiguration.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
Потоковая обработка часто имеет состояние. Когда мы хотим сохранить промежуточные результаты, нам нужно указать параметр STATE_DIR_CONFIG
.
В нашем тесте мы используем локальную файловую систему:
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());
4. Построение потоковой топологии
Как только мы определили нашу входную тему, мы можем создать потоковую топологию — это определение того, как события должны обрабатываться и преобразовываться.
В нашем примере мы хотели бы реализовать счетчик слов. Для каждого предложения, отправленного в inputTopic,
мы хотим разбить его на слова и вычислить вхождение каждого слова.
Мы можем использовать экземпляр класса KStreamsBuilder
, чтобы начать построение нашей топологии:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
Чтобы реализовать подсчет слов, во-первых, нам нужно разделить значения с помощью регулярного выражения.
Метод split возвращает массив. Мы используем flatMapValues()
, чтобы сгладить его. В противном случае мы получили бы список массивов, и писать код с такой структурой было бы неудобно.
Наконец, мы агрегируем значения для каждого слова и вызываем функцию count()
, которая будет подсчитывать количество вхождений определенного слова.
5. Обработка результатов
Мы уже подсчитали количество слов в наших входных сообщениях. Теперь выведем результаты на стандартный вывод с помощью метода foreach()
:
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
В рабочей среде часто такое потоковое задание может публиковать выходные данные в другой теме Kafka.
Мы могли бы сделать это с помощью метода to():
String outputTopic = "outputTopic";
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
Класс Serde
предоставляет нам предварительно сконфигурированные сериализаторы для типов Java, которые будут использоваться для сериализации объектов в массив байтов. Затем массив байтов будет отправлен в топик Kafka.
Мы используем String
как ключ к нашей теме и Long
как значение для фактического количества. Метод to()
сохранит полученные данные в outputTopic
.
6. Запуск задания KafkaStream
До этого момента мы построили топологию, которую можно выполнить. Однако работа еще не началась.
Нам нужно явно запустить нашу работу, вызвав метод start()
для экземпляра KafkaStreams
:
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
Thread.sleep(30000);
streams.close();
Обратите внимание, что мы ждем 30 секунд для завершения задания. В реальном сценарии это задание будет выполняться все время, обрабатывая события от Kafka по мере их поступления.
Мы можем проверить нашу работу, опубликовав некоторые события в нашей теме Kafka.
Давайте запустим kafka-console-producer
и вручную отправим некоторые события в наш inputTopic:
./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"
Таким образом, мы опубликовали два события в Kafka. Наше приложение будет использовать эти события и выведет следующий вывод:
word: -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word: -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2
Мы видим, что когда пришло первое сообщение, слово пони
встретилось только один раз. Но когда мы отправили второе сообщение, слово пони
произошло во второй раз печати: « слово: пони -> 2″
.
6. Заключение
В этой статье обсуждается, как создать основное приложение для обработки потоков, используя Apache Kafka в качестве источника данных и библиотеку KafkaStreams
в качестве библиотеки обработки потоков.
Все эти примеры и фрагменты кода можно найти в проекте GitHub — это проект Maven, поэтому его легко импортировать и запускать как есть.