Перейти к основному содержимому

Программное создание последовательностей с помощью Project Reactor

· 8 мин. чтения

1. Обзор

В этом уроке мы будем использовать основы Project Reactor , чтобы изучить несколько методов создания объектов Flux .

2. Зависимости Maven

Давайте начнем с пары зависимостей. Нам понадобятся реактор-ядро и реактор-тест :

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.6.RELEASE</version>
<scope>test</scope>
</dependency>

3. Синхронное излучение

Самый простой способ создать Flux — это Flux# generate . Этот метод использует функцию генератора для создания последовательности элементов.

Но сначала давайте определим класс для хранения наших методов, иллюстрирующих метод generate :

public class SequenceGenerator {
// methods that will follow
}

3.1. Генератор с новыми состояниями

Давайте посмотрим, как мы можем сгенерировать последовательность Фибоначчи с помощью Reactor:

public Flux<Integer> generateFibonacciWithTuples() {
return Flux.generate(
() -> Tuples.of(0, 1),
(state, sink) -> {
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
}
);
}

Нетрудно заметить, что этот метод generate принимает в качестве аргументов две функции — Callable и BiFunction :

  • Функция Callable устанавливает начальное состояние для генератора — в данном случае это Tuples с элементами 0 и 1 .
  • Функция BiFuntion — это генератор, потребляющий SynchronousSink , а затем испускающий элемент в каждом раунде с помощью следующего метода приемника и текущего состояния.

Как следует из названия, объект SynchronousSink работает синхронно. Однако обратите внимание, что мы не можем вызывать метод next этого объекта более одного раза для каждого вызова генератора.

Давайте проверим сгенерированную последовательность с помощью StepVerifier :

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

StepVerifier.create(fibonacciFlux)
.expectNext(0, 1, 1, 2, 3)
.expectComplete()
.verify();
}

В этом примере подписчик запрашивает всего пять элементов, поэтому сгенерированная последовательность заканчивается номером 3 .

Как мы видим, генератор возвращает новый объект состояния, который будет использоваться в следующем проходе. Впрочем, этого делать не обязательно. Вместо этого мы можем повторно использовать экземпляр состояния для всех вызовов генератора.

3.2. Генератор с изменяемым состоянием

Предположим, мы хотим сгенерировать последовательность Фибоначчи с переработанным состоянием. Чтобы продемонстрировать этот вариант использования, давайте сначала определим класс:

public class FibonacciState {
private int former;
private int latter;

// constructor, getters and setters
}

Мы будем использовать экземпляр этого класса для хранения состояния генератора. Два свойства этого экземпляра, прежний и последний , хранят два последовательных числа в последовательности.

Если мы изменим наш первоначальный пример, мы теперь будем использовать изменяемое состояние с generate :

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}

Как и в предыдущем примере, этот вариант генерации имеет параметры поставщика состояния и генератора.

Поставщик состояния типа Callable просто создает объект FibonacciState с начальными свойствами 0 и 1 . Этот объект состояния будет повторно использоваться на протяжении всего жизненного цикла генератора.

Точно так же, как SynchronousSink в примере Фибоначчи с кортежами , приемник здесь производит элементы один за другим. Однако, в отличие от этого примера, генератор при каждом вызове возвращает один и тот же объект состояния.

Также обратите внимание на этот раз, чтобы избежать бесконечной последовательности, мы указываем приемнику завершиться, когда произведенное значение достигнет предела.

И давайте снова проведем быстрый тест, чтобы убедиться, что он работает:

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();

StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
.expectNext(0, 1, 1, 2, 3, 5, 8)
.expectComplete()
.verify();
}

3.3. Вариант без гражданства

У метода generate есть еще один вариант только с одним параметром типа Consumer<SynchronousSink> . Этот вариант подходит только для создания заранее определенной последовательности, поэтому он не такой мощный. Тогда мы не будем подробно рассказывать об этом.

4. Асинхронное излучение

Синхронная эмиссия — не единственное решение для программного создания Flux .

Вместо этого мы можем использовать операторы create и push для создания нескольких элементов в раунде эмиссии асинхронным образом.

4.1. Метод создания _

Используя метод create , мы можем создавать элементы из нескольких потоков. В этом примере мы соберем элементы из двух разных источников в последовательность.

Во-первых, давайте посмотрим, чем create немного отличается от generate :

public class SequenceCreator {
public Consumer<List<Integer>> consumer;

public Flux<Integer> createNumberSequence() {
return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
}
}

В отличие от оператора generate , метод create не поддерживает состояние. И вместо того, чтобы генерировать элементы самостоятельно, эмиттер, переданный этому методу, получает элементы из внешнего источника.

Кроме того, мы видим, что оператор create запрашивает у нас FluxSink вместо SynchronousSink . С FluxSink мы можем вызывать next() столько раз, сколько нам нужно.

В нашем случае мы будем вызывать next() для каждого элемента, который у нас есть в списке элементов , выдавая их один за другим. Вскоре мы увидим, как заполнять элементы .

Наш внешний источник в данном случае — это воображаемое потребительское поле, хотя это может быть некий наблюдаемый API.

Давайте запустим оператор create , начав с двух последовательностей чисел:

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

// other statements described below
}

Эти последовательности, sequence1 и sequence2 , будут служить источниками элементов для сгенерированной последовательности.

Далее идут два объекта Thread , которые будут заливать элементы в публикатор:

SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2)
);

Когда вызывается оператор accept , элементы начинают поступать в источник последовательности.

А затем мы можем прослушать или подписаться на нашу новую консолидированную последовательность:

List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);

Подписываясь на нашу последовательность, мы указываем, что должно происходить с каждым элементом, испускаемым последовательностью. Здесь нужно добавить каждый элемент из разрозненных источников в сводный список.

Теперь мы запускаем весь процесс, в котором элементы перемещаются по двум разным потокам:

producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();

Как обычно, последним шагом является проверка результата операции:

assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);

Первые три числа в полученной последовательности происходят из sequence1 , а последние четыре — из sequence2 . Из-за характера асинхронных операций порядок элементов в этих последовательностях не гарантируется.

У метода create есть еще один вариант , принимающий аргумент типа OverflowStrategy . Как следует из названия, этот аргумент управляет обратным давлением, когда нисходящий поток не может идти в ногу с издателем. По умолчанию в таком случае издатель буферизует все элементы.

4.2. Метод толчка _

В дополнение к оператору create класс Flux имеет еще один статический метод для асинхронного создания последовательности, а именно push . Этот метод работает так же, как create , за исключением того, что он позволяет только одному производящему потоку одновременно генерировать сигналы.

Мы могли бы заменить метод create в примере, который мы только что рассмотрели, на метод push , и код все равно скомпилируется.

Однако иногда мы могли видеть ошибку утверждения, так как оператор push предотвращает одновременный вызов FluxSink#next в разных потоках. В результате мы должны использовать push , только если мы не собираемся использовать несколько потоков.

5. Обработка последовательностей

Все методы, которые мы видели до сих пор, являются статическими и позволяют создавать последовательность из заданного источника. API Flux также предоставляет метод экземпляра с именем handle для обработки последовательности, созданной издателем.

Этот оператор дескриптора принимает последовательность, выполняет некоторую обработку и, возможно, удаляет некоторые элементы. В связи с этим можно сказать, что оператор дескриптора работает так же, как карта и фильтр .

Давайте взглянем на простую иллюстрацию метода handle :

public class SequenceHandler {
public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
return sequence.handle((number, sink) -> {
if (number % 2 == 0) {
sink.next(number / 2);
}
});
}
}

В этом примере оператор дескриптора принимает последовательность чисел, деля значение на 2 , если оно четное. В случае, если значение является нечетным числом, оператор ничего не делает, то есть такое число игнорируется.

Еще одна вещь, которую следует отметить, это то, что, как и в случае с методом generate , handle использует SynchronousSink и разрешает только однократные выбросы.

И, наконец, нам нужно что-то проверить. Давайте в последний раз воспользуемся StepVerifier , чтобы убедиться, что наш обработчик работает:

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
SequenceHandler sequenceHandler = new SequenceHandler();
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
.expectNext(0, 1, 4, 17)
.expectComplete()
.verify();
}

Среди первых 10 элементов последовательности Фибоначчи есть четыре четных числа: 0 , 2 , 8 и 34 , отсюда и аргументы, которые мы передаем методу expectNext .

6. Заключение

В этой статье мы рассмотрели различные методы API Flux , которые можно использовать для создания последовательности программным способом, в частности операторы генерации и создания .

Исходный код этого руководства доступен на GitHub . Это проект Maven, и он должен работать как есть.