Перейти к основному содержимому

Объединение комплементов RxJava

· 5 мин. чтения

1. Обзор

В этом руководстве мы поиграем с Completable типом RxJava , который представляет результат вычисления без фактического значения. ``

2. Зависимость RxJava

Давайте включим зависимость RxJava 2 в наш проект Maven:

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>

Обычно мы можем найти последнюю версию на Maven Central .

3. Завершаемый тип

Completable похож на Observable , за исключением того, что первый выдает либо сигналы завершения, либо сигналы об ошибках, но не элементы. Класс Completable содержит несколько удобных методов для его создания или получения из разных реактивных источников.

Мы можем создать экземпляр, который завершается немедленно, используя Completable.complete() .

Затем мы можем наблюдать за его состоянием с помощью DisposableCompletableObserver :

Completable
.complete()
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
System.out.println("Completed!");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});

Кроме того, мы можем создать экземпляр Completable из Callable, Action и Runnable :

Completable.fromRunnable(() -> {});

Кроме того, мы можем получить экземпляры Completable из других типов, используя либо Completable.from() , либо вызов ignoreElement() для самих источников Maybe , Single , Flowable и Observable :

Flowable<String> flowable = Flowable
.just("request received", "user logged in");
Completable flowableCompletable = Completable
.fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
.ignoreElement();

4. Цепочка комплементов

Мы можем использовать цепочку Completables во многих реальных случаях использования, когда мы заботимся только об успехе операции:

  • Действия типа «все или ничего», такие как выполнение запроса PUT для обновления удаленного объекта с последующим обновлением локальной базы данных в случае успеха.
  • Регистрация и ведение журнала постфактум
  • Оркестрация нескольких действий, например запуск аналитического задания после завершения действия приема.

Мы будем держать примеры простыми и не зависящими от проблем. Предположим, у нас есть несколько экземпляров Completable :

Completable first = Completable
.fromSingle(Single.just(1));
Completable second = Completable
.fromRunnable(() -> {});
Throwable throwable = new RuntimeException();
Completable error = Single.error(throwable)
.ignoreElement();

Чтобы объединить два Completable в один, мы можем использовать оператор andThen () :

first
.andThen(second)
.test()
.assertComplete();

Мы можем связать столько Completables , сколько необходимо. В то же время, если хотя бы один из источников не завершится, полученный Completable также не сработает onComplete() :

first
.andThen(second)
.andThen(error)
.test()
.assertError(throwable);

Кроме того, если один из источников бесконечен или по какой-то причине не достигает onComplete , результирующий Completable никогда не будет запускать ни onComplete() , ни onError().

Хорошо, что мы все еще можем протестировать этот сценарий:

...
.andThen(Completable.never())
.test()
.assertNotComplete();

5. Составление серии Completables

Представьте, что у нас есть куча Completables. Что касается практического варианта использования, предположим, что нам нужно зарегистрировать пользователя в нескольких отдельных подсистемах.

Чтобы объединить все Completables в один, мы можем использовать семейство методов merge (). Оператор merge() позволяет подписаться на все источники.

Результирующий экземпляр завершается, когда все источники завершены . Кроме того, он завершается с onError, когда любой из источников выдает ошибку:

Completable.mergeArray(first, second)
.test()
.assertComplete();

Completable.mergeArray(first, second, error)
.test()
.assertError(throwable);

Давайте перейдем к немного другому варианту использования. Допустим, нам нужно выполнить действие для каждого элемента, полученного из Flowable.

Затем нам нужен один Completable как для завершения восходящего потока, так и для всех действий на уровне элемента. На помощь в этом случае приходит оператор flatMapCompletable ():

Completable allElementsCompletable = Flowable
.just("request received", "user logged in")
.flatMapCompletable(message -> Completable
.fromRunnable(() -> System.out.println(message))
);
allElementsCompletable
.test()
.assertComplete();

Точно так же описанный выше метод доступен для остальных базовых реактивных классов, таких как Observable , Maybe или Single.

В качестве практического контекста для flatMapCompletable() мы могли бы подумать о украшении каждого элемента каким-либо побочным эффектом. Мы можем сделать запись в журнале для каждого выполненного элемента или сделать моментальный снимок хранилища при каждом успешном действии.

Наконец, нам может понадобиться создать Completable из нескольких других источников и завершить его, как только любой из них завершится. Вот где операторы амб могут помочь.

Префикс amb является сокращением от «неоднозначный», подразумевая неопределенность в отношении того, какой именно Completable будет завершен. Например, ambArray() :

Completable.ambArray(first, Completable.never(), second)
.test()
.assertComplete();

Обратите внимание, что вышеприведенный Completable также может завершаться с помощью onError() вместо onComplete() в зависимости от того, какой исходный завершается первым:

Completable.ambArray(error, first, second)
.test()
.assertError(throwable);

Кроме того, как только первый источник прекращает работу, остальные источники гарантированно удаляются.

Это означает, что все оставшиеся работающие Completables останавливаются с помощью Disposable.dispose() , а соответствующие CompletableObservers будут отписаны.

Что касается практического примера, мы можем использовать amb() , когда мы передаем файл резервной копии в несколько эквивалентных удаленных хранилищ. И мы завершаем процесс после завершения первой лучшей резервной копии или повторяем процесс в случае ошибки.

6. Заключение

В этой статье мы кратко рассмотрели Completable тип RxJava.

Мы начали с различных вариантов получения экземпляров Completable , а затем объединили и составили Completables с помощью операторов andThen(), merge(), flatMapCompletable() и amb…() .

Мы можем найти исходники всех примеров кода на GitHub .