Перейти к основному содержимому

Объединение наблюдаемых в RxJava

· 3 мин. чтения

1. Введение

В этом кратком руководстве мы обсудим различные способы объединения Observables в RxJava.

Если вы новичок в RxJava, обязательно сначала ознакомьтесь с этим вводным руководством .

Теперь, давайте прыгать прямо в.

2. Наблюдаемые

Наблюдаемые последовательности или просто Observables — это представления асинхронных потоков данных.

Они основаны на шаблоне Observer, в котором объект, называемый Observer , подписывается на элементы, испускаемые Observable .

Подписка не блокируется, поскольку Observer может реагировать на все, что Observable будет испускать в будущем. Это, в свою очередь, облегчает параллелизм.

Вот простая демонстрация в RxJava:

Observable
.from(new String[] { "John", "Doe" })
.subscribe(name -> System.out.println("Hello " + name))

3. Объединение наблюдаемых

При программировании с использованием реактивного фреймворка часто используется сочетание различных Observables .

Например, в веб-приложении нам может понадобиться получить два набора асинхронных потоков данных, которые не зависят друг от друга.

Вместо того, чтобы ждать завершения предыдущего потока перед запросом следующего потока, мы можем вызвать оба одновременно и подписаться на объединенные потоки.

В этом разделе мы обсудим некоторые из различных способов, которыми мы можем комбинировать несколько Observables в RxJava, и различные варианты использования, к которым применяется каждый метод.

3.1. Объединить

Мы можем использовать оператор слияния , чтобы объединить вывод нескольких Observables , чтобы они действовали как один:

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();

Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[] {"I love", "RxJava"})
).subscribe(testSubscriber);

testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

3.2. MergeDelayError

Метод mergeDelayError аналогичен слиянию в том, что он объединяет несколько Observables в один, но если во время слияния возникают ошибки, он позволяет безошибочным элементам продолжаться до распространения ошибок :

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();

Observable.mergeDelayError(
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);

testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(RuntimeException.class);
}

Приведенный выше пример выдает все безошибочные значения :

hello
world
rxjava

Обратите внимание, что если мы используем слияние вместо mergeDelayError , строка « rxjava» не будет выдана, потому что слияние немедленно останавливает поток данных из Observables при возникновении ошибки.

3.3. Почтовый индекс

Метод расширения zip объединяет две последовательности значений в виде пар :

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
List<String> zippedStrings = new ArrayList<>();

Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);

assertThat(zippedStrings).isNotEmpty();
assertThat(zippedStrings.size()).isEqualTo(3);
assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

3.4. Почтовый индекс с интервалом

В этом примере мы заархивируем поток с интервалом , который фактически задержит выпуск элементов первого потока:

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);

Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);

testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

4. Резюме

В этой статье мы рассмотрели несколько методов объединения Observables с RxJava. Вы можете узнать о других методах, таких как combLatest , join , groupJoin , switchOnNext , в официальной документации RxJava .

Как всегда, исходный код этой статьи доступен в нашем репозитории GitHub .