Перейти к основному содержимому

Введение в Apache Beam

· 6 мин. чтения

1. Обзор

В этом руководстве мы познакомимся с Apache Beam и рассмотрим его основные концепции.

Мы начнем с демонстрации варианта использования и преимуществ использования Apache Beam, а затем рассмотрим основные понятия и терминологию. После этого мы рассмотрим простой пример, иллюстрирующий все важные аспекты Apache Beam.

2. Что такое Apache Beam?

Apache Beam (Batch + strEAM) — это унифицированная модель программирования для заданий пакетной и потоковой обработки данных. Он предоставляет комплект для разработки программного обеспечения для определения и построения конвейеров обработки данных, а также бегунов для их выполнения.

Apache Beam предназначен для обеспечения переносимого уровня программирования. Фактически Beam Pipeline Runners переводят конвейер обработки данных в API, совместимый с серверной частью по выбору пользователя. В настоящее время поддерживаются следующие серверные части распределенной обработки:

  • Апач Апекс
  • Апач Флинк
  • Шестеренчатый насос Apache (инкубационный)
  • Апач Самза
  • Апач Спарк
  • Облачный поток данных Google
  • Хейзелкаст Джет

3. Почему Apache Beam?

Apache Beam объединяет пакетную и потоковую обработку данных, в то время как другие часто делают это через отдельные API. Следовательно, очень легко изменить потоковый процесс на пакетный и наоборот, скажем, по мере изменения требований.

Apache Beam повышает мобильность и гибкость. Мы сосредотачиваемся на нашей логике, а не на основных деталях. Более того, мы можем в любой момент изменить серверную часть обработки данных.

Для Apache Beam доступны пакеты SDK для Java, Python, Go и Scala. Действительно, каждый в команде может использовать его на своем языке.

4. Основные понятия

С помощью Apache Beam мы можем создавать графы рабочих процессов (конвейеры) и выполнять их. Ключевыми понятиями модели программирования являются:

  • PCollection — представляет собой набор данных, который может быть фиксированным пакетом или потоком данных.
  • PTransform — операция обработки данных, которая принимает одну или несколько PCollection и выводит ноль или более PCollection .
  • Pipeline — представляет собой ориентированный ациклический граф PCollection и PTransform и, следовательно, инкапсулирует всю работу по обработке данных.
  • PipelineRunner — выполняет Pipeline на указанном сервере распределенной обработки.

Проще говоря, PipelineRunner выполняет Pipeline, а Pipeline состоит из PCollection и PTransform .

5. Пример подсчета слов

Теперь, когда мы изучили основные концепции Apache Beam, давайте разработаем и протестируем задачу подсчета слов.

5.1. Построение конвейера луча

Проектирование графа рабочего процесса — это первый шаг в каждом задании Apache Beam. Давайте определим шаги задачи подсчета слов:

  1. Прочитайте текст из источника.
  2. Разделите текст на список слов.
  3. Все слова в нижнем регистре.
  4. Обрезать знаки препинания.
  5. Фильтровать стоп-слова.
  6. Подсчитайте каждое уникальное слово.

Для этого нам нужно преобразовать описанные выше шаги в один конвейер , используя абстракции PCollection и PTransform .

5.2. Зависимости

Прежде чем мы сможем реализовать наш график рабочего процесса, мы должны добавить в наш проект основную зависимость Apache Beam :

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>

Beam Pipeline Runners полагаются на серверную часть распределенной обработки для выполнения задач. Давайте добавим DirectRunner в качестве зависимости времени выполнения:

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>

В отличие от других Pipeline Runner, DirectRunner не требует дополнительной настройки, что делает его хорошим выбором для начинающих.

5.3. Реализация

Apache Beam использует парадигму программирования Map-Reduce (такую же, как Java Streams ). На самом деле, прежде чем продолжить , полезно иметь базовое представление о методах reduce() , filter() , count() , map() и flatMap() .

Создание конвейера — это первое, что мы делаем:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Теперь применим нашу шестиэтапную задачу подсчета слов:

PCollection<KV<String, Long>> wordCount = p
.apply("(1) Read all lines",
TextIO.read().from(inputFilePath))
.apply("(2) Flatmap to a list of words",
FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\s"))))
.apply("(3) Lowercase all",
MapElements.into(TypeDescriptors.strings())
.via(word -> word.toLowerCase()))
.apply("(4) Trim punctuations",
MapElements.into(TypeDescriptors.strings())
.via(word -> trim(word)))
.apply("(5) Filter stopwords",
Filter.by(word -> !isStopWord(word)))
.apply("(6) Count words",
Count.perElement());

Первый (необязательный) аргумент apply() — это строка , предназначенная только для лучшей читабельности кода. Вот что делает каждый apply() в приведенном выше коде:

  1. Сначала мы читаем входной текстовый файл построчно, используя TextIO .
  2. Разделив каждую строку пробелами, мы сопоставляем ее со списком слов.
  3. Количество слов не чувствительно к регистру, поэтому мы используем строчные буквы для всех слов.
  4. Раньше мы разбивали строки по пробелам, заканчивая такими словами, как «слово!» и «слово?», поэтому убираем знаки препинания.
  5. Такие стоп-слова, как «is» и «by», встречаются почти в каждом тексте на английском языке, поэтому мы их удаляем.
  6. Наконец, мы подсчитываем уникальные слова с помощью встроенной функции Count.perElement() .

Как упоминалось ранее, конвейеры обрабатываются на распределенной серверной части. Невозможно выполнить итерацию по коллекции PCollection в памяти, поскольку она распределена между несколькими серверными частями. Вместо этого мы записываем результаты во внешнюю базу данных или файл.

Во- первых, мы преобразуем нашу PCollection в String . Затем мы используем TextIO для записи вывода:

wordCount.apply(MapElements.into(TypeDescriptors.strings())
.via(count -> count.getKey() + " --> " + count.getValue()))
.apply(TextIO.write().to(outputFilePath));

Теперь, когда наше определение конвейера завершено, мы можем запустить и протестировать его.

5.4. Запуск и тестирование

На данный момент мы определили конвейер для задачи подсчета слов. На этом этапе давайте запустим конвейер :

p.run().waitUntilFinish();

В этой строке кода Apache Beam отправит нашу задачу нескольким экземплярам DirectRunner . Следовательно, в конце будет сгенерировано несколько выходных файлов. Они будут содержать такие вещи, как:

...
apache --> 3
beam --> 5
rocks --> 2
...

Определение и запуск распределенного задания в Apache Beam так же просто и выразительно. Для сравнения, реализация подсчета слов также доступна в Apache Spark , Apache Flink и Hazelcast Jet .

6. Куда мы идем отсюда?

Мы успешно подсчитали каждое слово из нашего входного файла, но у нас пока нет отчета о наиболее часто встречающихся словах. Конечно, сортировка PCollection — это хорошая задача, которую нужно решить в качестве следующего шага.

Позже мы сможем больше узнать об управлении окнами, триггерах, метриках и более сложных преобразованиях. Документация Apache Beam содержит подробную информацию и справочные материалы.

7. Заключение

В этом руководстве мы узнали, что такое Apache Beam и почему он предпочтительнее альтернатив. Мы также продемонстрировали основные концепции Apache Beam на примере подсчета слов.

Код для этого руководства доступен на GitHub .