Перейти к основному содержимому

RxJava 2 — текучий

· 5 мин. чтения

1. Введение

RxJava — это реализация Java Reactive Extensions, которая позволяет нам писать управляемые событиями и асинхронные приложения. Дополнительную информацию о том, как использовать RxJava, можно найти в нашей вводной статье здесь .

RxJava 2 был переписан с нуля, что принесло множество новых функций; некоторые из них были созданы в ответ на проблемы, существовавшие в предыдущей версии фреймворка.

Одной из таких функций является io.reactivex.Flowable .

2. Наблюдаемый vs. _ Текучий

В предыдущей версии RxJava был только один базовый класс для работы с источниками, поддерживающими и не поддерживающими противодавление, — Observable.

В RxJava 2 введено четкое различие между этими двумя типами источников — источники, поддерживающие противодавление, теперь представлены с помощью специального класса — Flowable.

Наблюдаемые источники не поддерживают противодавление. Из-за этого мы должны использовать его для источников, которые мы просто потребляем и на которые не можем повлиять.

Кроме того, если мы имеем дело с большим количеством элементов, могут возникнуть два возможных сценария, связанных с противодавлением , в зависимости от типа Observable .

В случае использования так называемого « холодного наблюдателя » события генерируются лениво, поэтому мы защищены от переполнения наблюдателя.

Однако при использовании « горячего наблюдаемого » это будет продолжать генерировать события, даже если потребитель не может идти в ногу.

3. Создание потока

Существуют разные способы создания Flowable . Для нас удобно, что эти методы похожи на методы в Observable в первой версии RxJava.

3.1. Простой текучий

Мы можем создать Flowable с помощью метода just() так же, как и с Observable:

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

Несмотря на то, что использовать just() довольно просто, создание Flowable из статических данных не очень распространено, и оно используется в целях тестирования.

3.2. Текучесть из наблюдаемого

Когда у нас есть Observable , мы можем легко преобразовать его в Flowable с помощью метода toFlowable() :

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
.toFlowable(BackpressureStrategy.BUFFER);

Обратите внимание: чтобы выполнить преобразование, нам нужно обогатить Observable стратегией обратного давления. Мы опишем доступные стратегии в следующем разделе.

3.3. Flowable от FlowableOnSubscribe

В RxJava 2 появился функциональный интерфейс FlowableOnSubscribe , представляющий Flowable , который начинает генерировать события после того, как потребитель подпишется на него.

Благодаря этому все клиенты будут получать одинаковый набор событий, что делает FlowableOnSubscribe безопасным от противодавления.

Когда у нас есть FlowableOnSubscribe , мы можем использовать его для создания Flowable :

FlowableOnSubscribe<Integer> flowableOnSubscribe
= flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

В документации описано множество других способов создания Flowable.

4. Стратегия текучести противодавления

Некоторые методы, такие как toFlowable() или create() , принимают BackpressureStrategy в качестве аргумента.

BackpressureStrategy — это перечисление, определяющее поведение противодавления, которое мы применим к нашему Flowable .

Он может кэшировать или отбрасывать события или вообще не реализовывать какое-либо поведение, в последнем случае мы будем нести ответственность за его определение с помощью операторов обратного давления.

BackpressureStrategy похож на BackpressureMode, присутствующий в предыдущей версии RxJava.

В RxJava 2 доступно пять различных стратегий.

4.1. Буфер

Если мы используем BackpressureStrategy.BUFFER , источник будет буферизовать все события до тех пор, пока подписчик не сможет их использовать :

public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000)
.boxed()
.collect(Collectors.toList());

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation()).test();

testSubscriber.awaitTerminalEvent();

List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertEquals(testList, receivedInts);
}

Это похоже на вызов метода onBackpressureBuffer() в Flowable , но не позволяет явно определить размер буфера или действие onOverflow.

4.2. Уронить

Мы можем использовать BackpressureStrategy.DROP , чтобы отбрасывать события, которые нельзя использовать, вместо их буферизации.

Опять же, это похоже на использование onBackpressureDrop () в Flowable :

public void whenDropStrategyUsed_thenOnBackpressureDropped() {

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}

4.3. Последний

Использование BackpressureStrategy.LATEST заставит источник сохранять только самые последние события, тем самым перезаписывая любые предыдущие значения, если потребитель не может идти в ногу:

public void whenLatestStrategyUsed_thenTheLastElementReceived() {

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.computation())
.test();

testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}

`Когда мы смотрим на код, BackpressureStrategy.LATEST и BackpressureStrategy.DROP выглядят очень похоже.`

Однако BackpressureStrategy.LATEST перезапишет элементы, которые наш подписчик не может обработать, и сохранит только самые последние, отсюда и название.

BackpressureStrategy.DROP, с другой стороны, отбрасывает элементы, которые не могут быть обработаны. Это означает, что новейшие элементы не обязательно будут испускаться.

4.4. Ошибка

Когда мы используем BackpressureStrategy.ERROR, мы просто говорим, что не ожидаем возникновения обратного давления . Следовательно, следует выдать исключение MissingBackpressureException , если потребитель не может идти в ногу с источником:

public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.ERROR)
.observeOn(Schedulers.computation())
.test();

subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}

4.5. Отсутствующий

Если мы используем BackpressureStrategy.MISSING , источник будет помещать элементы без отбрасывания или буферизации.

В этом случае нисходящему потоку придется иметь дело с переполнением:

public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.MISSING)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}

В наших тестах мы исключаем MissingbackpressureException как для стратегий ERROR , так и для стратегий MISSING . Поскольку оба они будут генерировать такое исключение, когда внутренний буфер источника будет переполнен.

Однако стоит отметить, что оба они имеют разное назначение.

Мы должны использовать первый, когда мы вообще не ожидаем обратного давления, и мы хотим, чтобы источник выдавал исключение в случае его возникновения.

Последний можно использовать, если мы не хотим указывать поведение по умолчанию при создании Flowable . И мы собираемся использовать операторы обратного давления, чтобы определить это позже.

5. Резюме

В этом руководстве мы представили новый класс, представленный в RxJava 2 , под названием Flowable.

Чтобы найти больше информации о самом Flowable и его API, мы можем обратиться к документации .

Как всегда, все примеры кода можно найти на GitHub .