1. Обзор
В этом руководстве мы познакомимся с Apache Beam и рассмотрим его основные концепции.
Мы начнем с демонстрации варианта использования и преимуществ использования Apache Beam, а затем рассмотрим основные понятия и терминологию. После этого мы рассмотрим простой пример, иллюстрирующий все важные аспекты Apache Beam.
2. Что такое Apache Beam?
Apache Beam (Batch + strEAM) — это унифицированная модель программирования для заданий пакетной и потоковой обработки данных. Он предоставляет комплект для разработки программного обеспечения для определения и построения конвейеров обработки данных, а также бегунов для их выполнения.
Apache Beam предназначен для обеспечения переносимого уровня программирования. Фактически Beam Pipeline Runners переводят конвейер обработки данных в API, совместимый с серверной частью по выбору пользователя. В настоящее время поддерживаются следующие серверные части распределенной обработки:
- Апач Апекс
- Апач Флинк
- Шестеренчатый насос Apache (инкубационный)
- Апач Самза
- Апач Спарк
- Облачный поток данных Google
- Хейзелкаст Джет
3. Почему Apache Beam?
Apache Beam объединяет пакетную и потоковую обработку данных, в то время как другие часто делают это через отдельные API. Следовательно, очень легко изменить потоковый процесс на пакетный и наоборот, скажем, по мере изменения требований.
Apache Beam повышает мобильность и гибкость. Мы сосредотачиваемся на нашей логике, а не на основных деталях. Более того, мы можем в любой момент изменить серверную часть обработки данных.
Для Apache Beam доступны пакеты SDK для Java, Python, Go и Scala. Действительно, каждый в команде может использовать его на своем языке.
4. Основные понятия
С помощью Apache Beam мы можем создавать графы рабочих процессов (конвейеры) и выполнять их. Ключевыми понятиями модели программирования являются:
PCollection
— представляет собой набор данных, который может быть фиксированным пакетом или потоком данных.PTransform
— операция обработки данных, которая принимает одну или несколькоPCollection
и выводит ноль или более PCollection.
Pipeline
— представляет собой ориентированный ациклический графPCollection
иPTransform
и, следовательно, инкапсулирует всю работу по обработке данных.PipelineRunner
— выполняетPipeline
на указанном сервере распределенной обработки.
Проще говоря, PipelineRunner
выполняет Pipeline,
а Pipeline
состоит из PCollection
и PTransform
.
5. Пример подсчета слов
Теперь, когда мы изучили основные концепции Apache Beam, давайте разработаем и протестируем задачу подсчета слов.
5.1. Построение конвейера луча
Проектирование графа рабочего процесса — это первый шаг в каждом задании Apache Beam. Давайте определим шаги задачи подсчета слов:
- Прочитайте текст из источника.
- Разделите текст на список слов.
- Все слова в нижнем регистре.
- Обрезать знаки препинания.
- Фильтровать стоп-слова.
- Подсчитайте каждое уникальное слово.
Для этого нам нужно преобразовать описанные выше шаги в один конвейер
, используя абстракции PCollection
и PTransform
.
5.2. Зависимости
Прежде чем мы сможем реализовать наш график рабочего процесса, мы должны добавить в наш проект основную зависимость Apache Beam :
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
Beam Pipeline Runners полагаются на серверную часть распределенной обработки для выполнения задач. Давайте добавим DirectRunner
в качестве зависимости времени выполнения:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
В отличие от других Pipeline Runner, DirectRunner
не требует дополнительной настройки, что делает его хорошим выбором для начинающих.
5.3. Реализация
Apache Beam использует парадигму программирования Map-Reduce (такую же, как Java Streams ). На самом деле, прежде чем продолжить , полезно иметь базовое представление о методах reduce()
, filter()
, count()
, map()
и flatMap() .
Создание конвейера
— это первое, что мы делаем:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
Теперь применим нашу шестиэтапную задачу подсчета слов:
PCollection<KV<String, Long>> wordCount = p
.apply("(1) Read all lines",
TextIO.read().from(inputFilePath))
.apply("(2) Flatmap to a list of words",
FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\s"))))
.apply("(3) Lowercase all",
MapElements.into(TypeDescriptors.strings())
.via(word -> word.toLowerCase()))
.apply("(4) Trim punctuations",
MapElements.into(TypeDescriptors.strings())
.via(word -> trim(word)))
.apply("(5) Filter stopwords",
Filter.by(word -> !isStopWord(word)))
.apply("(6) Count words",
Count.perElement());
Первый (необязательный) аргумент apply()
— это строка
, предназначенная только для лучшей читабельности кода. Вот что делает каждый apply()
в приведенном выше коде:
- Сначала мы читаем входной текстовый файл построчно, используя
TextIO
. - Разделив каждую строку пробелами, мы сопоставляем ее со списком слов.
- Количество слов не чувствительно к регистру, поэтому мы используем строчные буквы для всех слов.
- Раньше мы разбивали строки по пробелам, заканчивая такими словами, как «слово!» и «слово?», поэтому убираем знаки препинания.
- Такие стоп-слова, как «is» и «by», встречаются почти в каждом тексте на английском языке, поэтому мы их удаляем.
- Наконец, мы подсчитываем уникальные слова с помощью встроенной функции
Count.perElement()
.
Как упоминалось ранее, конвейеры обрабатываются на распределенной серверной части. Невозможно выполнить итерацию по коллекции PCollection
в памяти, поскольку она распределена между несколькими серверными частями. Вместо этого мы записываем результаты во внешнюю базу данных или файл.
Во- первых, мы преобразуем нашу PCollection
в String
. Затем мы используем TextIO
для записи вывода:
wordCount.apply(MapElements.into(TypeDescriptors.strings())
.via(count -> count.getKey() + " --> " + count.getValue()))
.apply(TextIO.write().to(outputFilePath));
Теперь, когда наше определение конвейера
завершено, мы можем запустить и протестировать его.
5.4. Запуск и тестирование
На данный момент мы определили конвейер
для задачи подсчета слов. На этом этапе давайте запустим конвейер
:
p.run().waitUntilFinish();
В этой строке кода Apache Beam отправит нашу задачу нескольким экземплярам DirectRunner
. Следовательно, в конце будет сгенерировано несколько выходных файлов. Они будут содержать такие вещи, как:
...
apache --> 3
beam --> 5
rocks --> 2
...
Определение и запуск распределенного задания в Apache Beam так же просто и выразительно. Для сравнения, реализация подсчета слов также доступна в Apache Spark , Apache Flink и Hazelcast Jet .
6. Куда мы идем отсюда?
Мы успешно подсчитали каждое слово из нашего входного файла, но у нас пока нет отчета о наиболее часто встречающихся словах. Конечно, сортировка PCollection
— это хорошая задача, которую нужно решить в качестве следующего шага.
Позже мы сможем больше узнать об управлении окнами, триггерах, метриках и более сложных преобразованиях. Документация Apache Beam содержит подробную информацию и справочные материалы.
7. Заключение
В этом руководстве мы узнали, что такое Apache Beam и почему он предпочтительнее альтернатив. Мы также продемонстрировали основные концепции Apache Beam на примере подсчета слов.
Код для этого руководства доступен на GitHub .