Перейти к основному содержимому

Введение в Hazelcast Jet

· 4 мин. чтения

1. Введение

В этом уроке мы узнаем о Hazelcast Jet. Это механизм распределенной обработки данных, предоставленный Hazelcast, Inc. и построенный на основе Hazelcast IMDG.

Если вы хотите узнать о Hazelcast IMDG, вот статья для начала работы.

2. Что такое Hazelcast Jet?

Hazelcast Jet — это механизм распределенной обработки данных, который обрабатывает данные как потоки. Он может обрабатывать данные, которые хранятся в базе данных или файлах, а также данные, которые передаются сервером Kafka.

Более того, он может выполнять агрегатные функции над бесконечными потоками данных, разделяя потоки на подмножества и применяя агрегирование к каждому подмножеству. Эта концепция известна как работа с окнами в терминологии Jet.

Мы можем развернуть Jet в кластере машин, а затем отправить ему наши задания по обработке данных. Jet заставит всех членов кластера автоматически обрабатывать данные. Каждый член кластера потребляет часть данных, что упрощает масштабирование до любого уровня пропускной способности.

Вот типичные варианты использования Hazelcast Jet:

  • Потоковая обработка в реальном времени
  • Быстрая пакетная обработка
  • Обработка потоков Java 8 распределенным способом
  • Обработка данных в микросервисах

3. Настройка

Чтобы настроить Hazelcast Jet в нашей среде, нам просто нужно добавить одну зависимость Maven в наш pom.xml .

Вот как мы это делаем:

<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.2</version>
</dependency>

Включение этой зависимости загрузит файл jar размером 10 МБ, который предоставит нам всю инфраструктуру, необходимую для построения конвейера распределенной обработки данных.

Последнюю версию для Hazelcast Jet можно найти здесь .

4. Образец заявления

Чтобы узнать больше о Hazelcast Jet, мы создадим пример приложения, которое принимает на вход предложения и слово, которое нужно найти в этих предложениях, и возвращает количество указанного слова в этих предложениях.

4.1. Трубопровод

Конвейер образует базовую конструкцию для приложения Jet. Обработка внутри конвейера состоит из следующих шагов:

  • читать данные из источника
  • преобразовать данные
  • записать данные в приемник

Для нашего приложения конвейер будет читать из распределенного списка , применять преобразование группировки и агрегации и, наконец, записывать в распределенную карту .

Вот как мы пишем наш пайплайн:

private Pipeline createPipeLine() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list(LIST_NAME))
.flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.writeTo(Sinks.map(MAP_NAME));
return p;
}

Как только мы прочитали из источника, мы просматриваем данные и разделяем их по пространству, используя регулярное выражение. После этого отфильтровываем пробелы.

Наконец, мы группируем слова, объединяем их и записываем результаты на карту.

4.2. Работа

Теперь, когда наш конвейер определен, мы создаем задание для выполнения конвейера.

Вот как мы пишем функцию countWord , которая принимает параметры и возвращает количество:

public Long countWord(List<String> sentences, String word) {
long count = 0;
JetInstance jet = Jet.newJetInstance();
try {
List<String> textList = jet.getList(LIST_NAME);
textList.addAll(sentences);
Pipeline p = createPipeLine();
jet.newJob(p).join();
Map<String, Long> counts = jet.getMap(MAP_NAME);
count = counts.get(word);
} finally {
Jet.shutdownAll();
}
return count;
}

Сначала мы создаем экземпляр Jet, чтобы создать нашу работу и использовать конвейер. Затем мы копируем входной список в распределенный список, чтобы он был доступен для всех экземпляров.

Затем мы отправляем задание, используя конвейер, который мы построили выше. Метод newJob() возвращает исполняемое задание, которое запускается Jet асинхронно. Метод соединения ожидает завершения задания и выдает исключение , если задание завершается с ошибкой.

Когда задание завершается, результаты извлекаются в распределенной карте, как мы определили в нашем конвейере. Итак, мы получаем Map из экземпляра Jet и получаем количество слов против него.

Наконец, мы выключаем экземпляр Jet. Важно закрыть его после того, как наше выполнение закончилось, так как экземпляр Jet запускает свои собственные потоки . В противном случае наш Java-процесс будет по-прежнему активен даже после выхода из нашего метода.

Вот модульный тест, который проверяет код, который мы написали для Jet:

@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
List<String> sentences = new ArrayList<>();
sentences.add("The first second was alright, but the second second was tough.");
WordCounter wordCounter = new WordCounter();
long countSecond = wordCounter.countWord(sentences, "second");
assertEquals(3, countSecond);
}

5. Вывод

В этой статье мы узнали о Hazelcast Jet. Чтобы узнать больше о нем и его функциях, обратитесь к руководству .

Как обычно, код примеров, использованных в этой статье, можно найти на Github .