Перейти к основному содержимому

Путеводитель по ручьям Акка

· 7 мин. чтения

1. Обзор

В этой статье мы рассмотрим библиотеку akka-streams , созданную поверх фреймворка актеров Akka, которая соответствует манифесту реактивных потоков . Akka Streams API позволяет нам легко составлять потоки преобразования данных из независимых шагов.

Более того, вся обработка выполняется реактивным, неблокирующим и асинхронным способом.

2. Зависимости Maven

Для начала нам нужно добавить библиотеки akka-stream и akka-stream-testkit в наш pom.xml:

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit_2.11</artifactId>
<version>2.5.2</version>
</dependency>

3. API потоков Akka

Для работы с Akka Streams нам необходимо знать основные концепции API:

  • Источник — точка входа в обработку в библиотеке akka-stream — мы можем создать экземпляр этого класса из нескольких источников; например, мы можем использовать метод single() , если мы хотим создать источник из одной строки , или мы можем создать источник из итерации элементов.
  • Flow — основной строительный блок обработки — каждый Flow имеет одно входное и одно выходное значение.
  • Материализатор — мы можем использовать его, если хотим, чтобы наш поток имел некоторые побочные эффекты, такие как ведение журнала или сохранение результатов ; чаще всего мы будем передавать псевдоним NotUsed в качестве материализатора , чтобы обозначить, что наш поток не должен иметь никаких побочных эффектов
  • Операция приемника — когда мы строим поток, он не выполняется, пока мы не зарегистрируем на нем операцию — это терминальная операция, которая запускает все вычисления во всем потоке

4. Создание потоков в Akka Streams

Давайте начнем с создания простого примера, где мы покажем, как создавать и комбинировать несколько Flow — для обработки потока целых чисел и вычисления среднего скользящего окна пар целых чисел из потока.

Мы проанализируем строку целых чисел, разделенных точкой с запятой, в качестве входных данных, чтобы создать наш источник akka-stream для примера.

4.1. Использование потока для анализа ввода

Во-первых, давайте создадим класс DataImporter , который будет принимать экземпляр ActorSystem , который мы будем использовать позже для создания нашего потока :

public class DataImporter {
private ActorSystem actorSystem;

// standard constructors, getters...
}

Далее давайте создадим метод parseLine , который будет генерировать список целых чисел из нашей входной строки с разделителями. Имейте в виду, что здесь мы используем Java Stream API только для синтаксического анализа:

private List<Integer> parseLine(String line) {
String[] fields = line.split(";");
return Arrays.stream(fields)
.map(Integer::parseInt)
.collect(Collectors.toList());
}

Наш исходный поток применит parseLine к нашему вводу, чтобы создать поток с типом ввода String и типом вывода Integer :

private Flow<String, Integer, NotUsed> parseContent() {
return Flow.of(String.class)
.mapConcat(this::parseLine);
}

Когда мы вызываем метод parseLine() , компилятор знает, что аргументом этой лямбда-функции будет String — такой же тип ввода для нашего Flow .

Обратите внимание, что мы используем метод mapConcat() , эквивалентный методу flatMap() в Java 8 , потому что мы хотим сгладить список целых чисел , возвращаемый функцией parseLine () , в поток целых чисел , чтобы последующие этапы нашей обработки не требовали разобраться со списком .

4.2. Использование потока для выполнения вычислений

На данный момент у нас есть поток анализируемых целых чисел. Теперь нам нужно реализовать логику, которая будет группировать все входные элементы в пары и вычислять среднее значение этих пар .

Теперь мы создадим поток целых чисел и сгруппируем их с помощью метода grouped ( ) .

Далее мы хотим рассчитать среднее значение.

Поскольку нас не интересует порядок, в котором будут обрабатываться эти средние значения, мы можем вычислять средние значения параллельно с использованием нескольких потоков с помощью метода mapAsyncUnordered() , передавая количество потоков в качестве аргумента этому методу.

Действие, которое будет передано в поток как лямбда, должно возвращать CompletableFuture , потому что это действие будет вычисляться асинхронно в отдельном потоке:

private Flow<Integer, Double, NotUsed> computeAverage() {
return Flow.of(Integer.class)
.grouped(2)
.mapAsyncUnordered(8, integers ->
CompletableFuture.supplyAsync(() -> integers.stream()
.mapToDouble(v -> v)
.average()
.orElse(-1.0)));
}

Мы вычисляем средние значения в восьми параллельных потоках. Обратите внимание, что мы используем Java 8 Stream API для вычисления среднего значения.

4.3. Объединение нескольких потоков в один поток

Flow API — это гибкая абстракция, которая позволяет нам создавать несколько экземпляров Flow для достижения конечной цели обработки . У нас могут быть гранулированные потоки, где один, например, анализирует JSON, другой выполняет некоторые преобразования, а третий собирает некоторую статистику.

Такая степень детализации поможет нам создать более тестируемый код, потому что мы можем тестировать каждый шаг обработки независимо.

Выше мы создали два потока, которые могут работать независимо друг от друга. Теперь мы хотим составить их вместе.

Во-первых, мы хотим проанализировать нашу входную строку , а затем мы хотим вычислить среднее значение для потока элементов.

Мы можем составить наши потоки, используя метод via() :

Flow<String, Double, NotUsed> calculateAverage() {
return Flow.of(String.class)
.via(parseContent())
.via(computeAverage());
}

Мы создали поток с типом ввода String и два других потока после него. Поток parseContent() принимает на вход строку и возвращает целое число в качестве вывода. Поток calculateAverage() берет это целое число и вычисляет среднее значение, возвращающее значение Double в качестве типа вывода.

5. Добавление стока в поток

Как мы уже упоминали, к этому моменту весь поток еще не выполняется, потому что он ленив. Чтобы начать выполнение потока , нам нужно определить Sink . Операция Sink может, например, сохранять данные в базу данных или отправлять результаты на какой-либо внешний веб-сервис.

Предположим, у нас есть класс AverageRepository со следующим методом save() , который записывает результаты в нашу базу данных:

CompletionStage<Double> save(Double average) {
return CompletableFuture.supplyAsync(() -> {
// write to database
return average;
});
}

Теперь мы хотим создать операцию Sink , которая использует этот метод для сохранения результатов обработки нашего потока . Чтобы создать наш Sink, нам сначала нужно создать Flow , который принимает результат нашей обработки в качестве типа ввода . Далее мы хотим сохранить все наши результаты в базу данных.

Опять же, нас не волнует порядок элементов, поэтому мы можем выполнять операции save() параллельно , используя метод mapAsyncUnordered() .

Чтобы создать Sink из потока , нам нужно вызвать toMat() с Sink.ignore() в качестве первого аргумента и Keep.right() в качестве второго, потому что мы хотим вернуть статус обработки:

private Sink<Double, CompletionStage<Done>> storeAverages() {
return Flow.of(Double.class)
.mapAsyncUnordered(4, averageRepository::save)
.toMat(Sink.ignore(), Keep.right());
}

6. Определение источника потока

Последнее, что нам нужно сделать, это создать источник из входной строки . Мы можем применить к этому источнику поток calculateAverage() , используя метод via() .

Затем, чтобы добавить приемник к обработке, нам нужно вызвать метод runWith() и передать только что созданный приемник storeAverages() :

CompletionStage<Done> calculateAverageForContent(String content) {
return Source.single(content)
.via(calculateAverage())
.runWith(storeAverages(), ActorMaterializer.create(actorSystem))
.whenComplete((d, e) -> {
if (d != null) {
System.out.println("Import finished ");
} else {
e.printStackTrace();
}
});
}

Обратите внимание, что когда обработка завершена, мы добавляем обратный вызов whenComplete() , в котором мы можем выполнить какое-либо действие в зависимости от результата обработки.

7. Тестирование потоков Akka

Мы можем протестировать нашу обработку с помощью akka-stream-testkit.

Лучший способ проверить фактическую логику обработки — проверить всю логику потока и использовать TestSink для запуска вычислений и подтверждения результатов.

В нашем тесте мы создаем поток , который хотим протестировать, а затем мы создаем источник из тестового входного содержимого:

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
// given
Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
String input = "1;9;11;0";

// when
Source<Double, NotUsed> flow = Source.single(input).via(tested);

// then
flow
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(4)
.expectNextUnordered(5d, 5.5);
}

Мы проверяем, ожидаем ли мы четыре входных аргумента, и два результата, которые являются средними, могут поступать в любом порядке, потому что наша обработка выполняется асинхронно и параллельно.

8. Заключение

В этой статье мы рассмотрели библиотеку akka-stream .

Мы определили процесс, который объединяет несколько потоков для вычисления скользящего среднего значений элементов. Затем мы определили источник , который является точкой входа обработки потока, и приемник , запускающий фактическую обработку.

Наконец, мы написали тест для нашей обработки, используя akka-stream-testkit .

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.