Перейти к основному содержимому

Введение в Apache Flink с Java

· 8 мин. чтения

Задача: Наибольшая подстрока без повторений

Для заданной строки s, найдите длину наибольшей подстроки без повторяющихся символов. Подстрока — это непрерывная непустая последовательность символов внутри строки...

ANDROMEDA 42

1. Обзор

Apache Flink — это платформа обработки больших данных, которая позволяет программистам обрабатывать огромные объемы данных очень эффективным и масштабируемым образом.

В этой статье мы познакомим вас с некоторыми основными концепциями API и стандартными преобразованиями данных, доступными в Apache Flink Java API . Плавный стиль этого API упрощает работу с центральной конструкцией Flink — распределенной коллекцией.

Во-первых, мы рассмотрим преобразования Flink DataSet API и используем их для реализации программы подсчета слов. Затем мы кратко рассмотрим API Flink DataStream , который позволяет обрабатывать потоки событий в режиме реального времени.

2. Зависимость от Maven

Для начала нам нужно добавить зависимости Maven в библиотеки flink-java и flink-test-utils :

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>1.2.0</version>
<scope>test<scope>
</dependency>

3. Основные концепции API

При работе с Flink нам нужно знать пару вещей, связанных с его API:

  • Каждая программа Flink выполняет преобразования распределенных коллекций данных. Предоставляются различные функции для преобразования данных, включая фильтрацию, сопоставление, объединение, группировку и агрегирование.
  • Операция приемника во Flink инициирует выполнение потока для получения желаемого результата программы , например, сохранения результата в файловой системе или вывода его на стандартный вывод.
  • Преобразования Flink являются ленивыми, что означает, что они не выполняются до тех пор, пока не будет вызвана операция приемника .
  • API Apache Flink поддерживает два режима работы — пакетный и в режиме реального времени. Если вы имеете дело с ограниченным источником данных, который можно обрабатывать в пакетном режиме, вы будете использовать DataSet API. Если вы хотите обрабатывать неограниченные потоки данных в режиме реального времени, вам необходимо использовать API DataStream .

4. Преобразования API набора данных

Точка входа в программу Flink — это экземпляр класса ExecutionEnvironment — он определяет контекст, в котором выполняется программа.

Давайте создадим ExecutionEnvironment , чтобы начать нашу обработку:

ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();

Обратите внимание, что когда вы запускаете приложение на локальном компьютере, оно будет выполнять обработку на локальной JVM. Если вы хотите начать обработку на кластере машин, вам нужно будет установить Apache Flink на эти машины и соответствующим образом настроить ExecutionEnvironment .

4.1. Создание набора данных

Чтобы начать выполнять преобразования данных, нам нужно предоставить нашей программе данные.

Давайте создадим экземпляр класса DataSet , используя нашу ExecutionEnvironement :

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

Вы можете создать DataSet из нескольких источников, таких как Apache Kafka, CSV, файл или практически любой другой источник данных.

4.2. Фильтровать и сокращать

Создав экземпляр класса DataSet , вы можете применить к нему преобразования.

Допустим, вы хотите отфильтровать числа, которые превышают определенный порог, а затем просуммировать их все . Для этого вы можете использовать преобразования filter() и reduce() :

int threshold = 30;
List<Integer> collect = amounts
.filter(a -> a > threshold)
.reduce((integer, t1) -> integer + t1)
.collect();

assertThat(collect.get(0)).isEqualTo(90);

Обратите внимание, что метод collect() является операцией приемника , которая инициирует фактическое преобразование данных.

4.3. карта

Допустим, у вас есть DataSet объектов Person :

private static class Person {
private int age;
private String name;

// standard constructors/getters/setters
}

Далее создадим DataSet из этих объектов:

DataSet<Person> personDataSource = env.fromCollection(
Arrays.asList(
new Person(23, "Tom"),
new Person(75, "Michael")));

Предположим, вы хотите извлечь из каждого объекта коллекции только поле age . Вы можете использовать преобразование map() , чтобы получить только определенное поле класса Person :

List<Integer> ages = personDataSource
.map(p -> p.age)
.collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

4.4. Присоединиться

Когда у вас есть два набора данных, вы можете соединить их в некотором поле идентификатора . Для этого вы можете использовать преобразование join() .

Создадим коллекции транзакций и адресов пользователя:

Tuple3<Integer, String, String> address
= new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
= env.fromElements(address);

Tuple2<Integer, String> firstTransaction
= new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions
= env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

Первое поле в обоих кортежах имеет тип Integer , и это поле id , по которому мы хотим соединить оба набора данных.

Чтобы выполнить фактическую логику присоединения, нам нужно реализовать интерфейс KeySelector для адреса и транзакции:

private static class IdKeySelectorTransaction 
implements KeySelector<Tuple2<Integer, String>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, String> value) {
return value.f0;
}
}

private static class IdKeySelectorAddress
implements KeySelector<Tuple3<Integer, String, String>, Integer> {
@Override
public Integer getKey(Tuple3<Integer, String, String> value) {
return value.f0;
}
}

Каждый селектор возвращает только поле, для которого должно быть выполнено соединение.

К сожалению, здесь невозможно использовать лямбда-выражения, потому что Flink нужна информация об универсальном типе.

Далее давайте реализуем логику слияния, используя эти селекторы:

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
joined = transactions.join(addresses)
.where(new IdKeySelectorTransaction())
.equalTo(new IdKeySelectorAddress())
.collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5. Сортировать

Допустим, у вас есть следующая коллекция Tuple2:

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
fourthPerson, secondPerson, thirdPerson, firstPerson);

Если вы хотите отсортировать эту коллекцию по первому полю кортежа, вы можете использовать преобразование sortPartitions() :

List<Tuple2<Integer, String>> sorted = transactions
.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
.collect();

assertThat(sorted)
.containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. Количество слов

Проблема подсчета слов обычно используется для демонстрации возможностей сред обработки больших данных. Базовое решение включает в себя подсчет вхождений слов в текстовом вводе. Давайте воспользуемся Flink для реализации решения этой проблемы.

В качестве первого шага в нашем решении мы создаем класс LineSplitter , который разбивает наш ввод на токены (слова), собирая для каждого токена Tuple2 пар ключ-значение. В каждом из этих кортежей ключом является слово, встречающееся в тексте, а значением является целая единица (1).

Этот класс реализует интерфейс FlatMapFunction , который принимает String в качестве входных данных и создает Tuple2 <String, Integer>:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
Stream.of(value.toLowerCase().split("\\W+"))
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2<>(token, 1)));
}
}

Мы вызываем метод collect() в классе Collector , чтобы продвигать данные вперед в конвейере обработки.

Наш следующий и последний шаг — сгруппировать кортежи по их первым элементам (словам), а затем выполнить суммирование вторых элементов, чтобы получить количество вхождений слов:

public static DataSet<Tuple2<String, Integer>> startWordCount(
ExecutionEnvironment env, List<String> lines) throws Exception {
DataSet<String> text = env.fromCollection(lines);

return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}

Мы используем три типа преобразований Flink: flatMap() , groupBy() и агрегат() .

Давайте напишем тест, подтверждающий, что реализация подсчета слов работает должным образом:

List<String> lines = Arrays.asList(
"This is a first sentence",
"This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();

assertThat(collect).containsExactlyInAnyOrder(
new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. API потока данных

6.1. Создание потока данных

Apache Flink также поддерживает обработку потоков событий через API DataStream. Если мы хотим начать потреблять события, нам сначала нужно использовать класс StreamExecutionEnvironment :

StreamExecutionEnvironment executionEnvironment
= StreamExecutionEnvironment.getExecutionEnvironment();

Далее мы можем создать поток событий, используя среду исполнения из различных источников. Это может быть какая-нибудь шина сообщений вроде Apache Kafka , но в этом примере мы просто создадим источник из пары строковых элементов:

DataStream<String> dataStream = executionEnvironment.fromElements(
"This is a first sentence",
"This is a second sentence with a one word");

Мы можем применять преобразования к каждому элементу DataStream , как в обычном классе DataSet :

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

Чтобы инициировать выполнение, нам нужно вызвать операцию приемника, такую как print() , которая просто напечатает результат преобразований в стандартный вывод, а затем с помощью метода execute() в классе StreamExecutionEnvironment :

upperCase.print();
env.execute();

Он выдаст следующий результат:

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Окно событий

При обработке потока событий в реальном времени вам иногда может понадобиться сгруппировать события вместе и применить некоторые вычисления к окну этих событий.

Предположим, у нас есть поток событий, где каждое событие представляет собой пару, состоящую из номера события и метки времени, когда событие было отправлено в нашу систему, и что мы можем допускать события, которые не по порядку, но только если они не опоздание более чем на двадцать секунд.

В этом примере давайте сначала создадим поток, имитирующий два события с разницей в несколько минут, и определим средство извлечения временных меток, указывающее наш порог задержки:

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
= env.fromElements(
new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor
<Tuple2<Integer, Long>>(Time.seconds(20)) {

@Override
public long extractTimestamp(Tuple2<Integer, Long> element) {
return element.f1 * 1000;
}
});

Затем давайте определим оконную операцию, чтобы сгруппировать наши события в пятисекундные окна и применить преобразование к этим событиям:

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();

Он получит последний элемент каждого пятисекундного окна, поэтому выведет:

1> (15,1491221519)

Обратите внимание, что мы не видим второе событие, потому что оно прибыло позже указанного порога задержки.

7. Заключение

В этой статье мы представили инфраструктуру Apache Flink и рассмотрели некоторые преобразования, поставляемые с ее API.

Мы внедрили программу подсчета слов, используя удобный и функциональный API DataSet от Flink. Затем мы рассмотрели API DataStream и реализовали простое преобразование потока событий в реальном времени.

Реализацию всех этих примеров и фрагментов кода можно найти на GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.