Перейти к основному содержимому

Объединение издателей в Project Reactor

· 5 мин. чтения

1. Обзор

В этой статье мы рассмотрим различные способы объединения издателей в Project Reactor .

2. Зависимости Maven

Давайте настроим наш пример с зависимостями Project Reactor :

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>

3. Объединение издателей

Учитывая сценарий, когда нужно работать с Flux<T> или Mono<T> , существуют разные способы объединения потоков.

Давайте создадим несколько примеров, чтобы проиллюстрировать использование статических методов в классе Flux<T> , таких как concat, concatWith, merge, zip и combLatest.

В наших примерах будут использоваться два публикатора типа Flux<Integer> , а именно evenNumbers , который представляет собой Flux of Integer и содержит последовательность четных чисел, начинающуюся с 1 ( минимальная переменная) и ограниченную 5 ( максимальная переменная).

Мы создадим oddNumbers , а также Flux типа Integer нечетных чисел:

Flux<Integer> evenNumbers = Flux
.range(min, max)
.filter(x -> x % 2 == 0); // i.e. 2, 4

Flux<Integer> oddNumbers = Flux
.range(min, max)
.filter(x -> x % 2 > 0); // ie. 1, 3, 5

3.1. c oncat ()

Метод concat выполняет конкатенацию входных данных, пересылая элементы, испускаемые источниками ниже по течению.

Конкатенация достигается путем последовательной подписки на первый источник, затем ожидания ее завершения перед подпиской на следующий, и так далее, пока не завершится последний источник. Любая ошибка немедленно прерывает последовательность и передается вниз по течению.

Вот краткий пример:

@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers,
oddNumbers);

StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}

3.2. совместить с ()

С помощью статического метода concatWith мы создадим конкатенацию двух источников типа Flux<T> в результате:

@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);

// same stepVerifier as in the concat example above
}

3.3. c объединитьПоследние()

Статический метод Flux combLatest будет генерировать данные, предоставленные комбинацией последнего опубликованного значения из каждого из источников Publisher.

Вот пример использования этого метода с двумя источниками Publisher и BiFunction в качестве параметров:

@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);

StepVerifier.create(fluxOfIntegers)
.expectNext(5) // 4 + 1
.expectNext(7) // 4 + 3
.expectNext(9) // 4 + 5
.expectComplete()
.verify();
}

Здесь мы видим, что функция combLatest применила функцию «a + b», используя последний элемент четных чисел ( 4 ) и элементы четных чисел (1,3,5) , таким образом сгенерировав последовательность 5,7,9 .

3.4. объединить()

Функция слияния выполняет слияние данных из последовательностей Publisher , содержащихся в массиве, в чередующуюся объединенную последовательность:

@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);

StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}

Интересно отметить, что, в отличие от concat ( ленивой подписки ) , источники подписываются охотно.

Здесь мы можем увидеть другой результат функции слияния , если мы вставим задержку между элементами издателей:

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));

StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}

3.5. слияниеПоследовательность()

Метод mergeSequential объединяет данные из последовательностей Publisher , представленных в массиве, в упорядоченную объединенную последовательность.

В отличие от concat , на источники охотно подписываются.

Кроме того, в отличие от merge , их испускаемые значения объединяются в окончательную последовательность в порядке подписки:

@Test
public void testMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);

StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}

3.6. слияниеЗадержкаОррор()

Ошибка mergeDelayError объединяет данные из последовательностей Publisher, содержащихся в массиве, в чередующуюся объединенную последовательность.

В отличие от concat , на источники охотно подписываются.

Этот вариант метода статического слияния задерживает любую ошибку до тех пор, пока остальная часть невыполненной работы слияния не будет обработана.

Вот пример слиянияDelayError:

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));

StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}

3.7. слиться с()

Статический метод mergeWith объединяет данные из этого Flux и Publisher в чередующуюся объединенную последовательность.

Опять же, в отличие от concat , на внутренние источники охотно подписываются:

@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);

// same StepVerifier as in "3.4. Merge"
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}

3.8. г IP ()

Статический метод zip склеивает вместе несколько источников, т. е. ожидает, пока все источники выдадут один элемент, и объединяет эти элементы в выходное значение (созданное с помощью предоставленной функции комбинатора).

Оператор будет продолжать делать это до тех пор, пока не завершится любой из источников:

@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);

StepVerifier.create(fluxOfIntegers)
.expectNext(3) // 2 + 1
.expectNext(7) // 4 + 3
.expectComplete()
.verify();
}

Так как в четных числах не осталось элементов для сопряжения, элемент 5 из издателя нечетных чисел игнорируется.

3.9. z ipWith()

zipWith выполняет тот же метод, что и zip , но только с двумя издателями:

@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a * b);

StepVerifier.create(fluxOfIntegers)
.expectNext(2) // 2 * 1
.expectNext(12) // 4 * 3
.expectComplete()
.verify();
}

4. Вывод

В этом кратком руководстве мы обнаружили несколько способов объединения издателей.

Как всегда, примеры доступны на GitHub .