Перейти к основному содержимому

Наблюдаемые служебные операторы в RxJava

· 7 мин. чтения

1. Обзор

В этой статье мы узнаем о некоторых служебных операторах для работы с Observables в RxJava и о том, как реализовать собственные.

Оператор — это функция, которая принимает и изменяет поведение вышестоящего Observable<T> и возвращает нижестоящий Observable<R> или Subscriber , где типы T и R могут совпадать, а могут и не совпадать.

Операторы оборачивают существующие Observable и улучшают их, как правило, путем перехвата подписки. Это может показаться сложным, но на самом деле это довольно гибко и не так сложно понять.

2. Делай

Существует несколько действий, которые могут изменить события жизненного цикла Observable .

Оператор d oOnNext модифицирует исходный код Observable ` , чтобы он вызывал действие при вызове onNext .`

Оператор doOnCompleted регистрирует действие, которое вызывается, если результирующий `` Observable завершается нормально , вызывая метод Observer onCompleted :

Observable.range(1, 10)
.doOnNext(r -> receivedTotal += r)
.doOnCompleted(() -> result = "Completed")
.subscribe();

assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));

Оператор doOnEach модифицирует источник Observable , чтобы он уведомлял Observer о каждом элементе и устанавливал обратный вызов, который будет вызываться каждый раз, когда элемент создается.

Оператор doOnSubscribe регистрирует действие, которое вызывается всякий раз, когда Observer подписывается на результирующий Observable .

Также есть оператор doOnUnsubscribe, который действует противоположно doOnSubscribe:

Observable.range(1, 10)
.doOnEach(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
receivedTotal += value;
}
})
.doOnSubscribe(() -> result = "Subscribed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));

Когда Observable завершается с ошибкой, мы можем использовать оператор doOnError для выполнения действия.

Оператор DoOnTerminate регистрирует действие, которое будет вызвано, когда Observable завершится успешно или с ошибкой:

thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
.single()
.doOnError(throwable -> { throw new RuntimeException("error");})
.doOnTerminate(() -> result += "doOnTerminate")
.doAfterTerminate(() -> result += "_doAfterTerminate")
.subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));

Также есть оператор finallyDo , который устарел в пользу doAfterTerminate. Он регистрирует действие, когда Observable завершается.

3. Наблюдение против подписки

По умолчанию Observable вместе с цепочкой операторов будет работать в том же потоке, в котором вызывается его метод Subscribe .

Оператор ObserveOn указывает другой планировщик , который Observable будет использовать для отправки уведомлений наблюдателям:

Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

Мы видим, что элементы были созданы в основном потоке и были перемещены до первого вызова карты .

Но после этогоObservOn перенаправил обработку в поток вычислений , который использовался при обработке map и конечного Subscriber.

Одна из проблем, которая может возникнуть при использованииObservOn , заключается в том, что нижний поток может производить выбросы быстрее, чем верхний поток может их обработать. Это может вызвать проблемы с обратным давлением , которые нам, возможно, придется учитывать.

Чтобы указать, на каком планировщике должен работать Observable , мы можем использовать оператор subscribeOn :

Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

SubscribeOn инструктирует исходный Observable , какой поток использовать для отправки элементов — только этот поток будет передавать элементы подписчику . Его можно разместить в любом месте потока, потому что он влияет только на подписку.

По сути, мы можем использовать только один subscribeOn , но у нас может быть любое количество операторовObservOn . Мы можем легко переключать выбросы с одного потока на другой, используяObservOn .

4. Single и SingleOrDefault

Оператор Single возвращает Observable , который испускает единственный элемент, испускаемый исходным Observable:

Observable.range(1, 1)
.single()
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);

Если исходный Observable создает ноль или более одного элемента, будет выдано исключение:

Observable.empty()
.single()
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);

С другой стороны, оператор SingleOrDefault очень похож на Single, а это означает, что он также возвращает Observable , который испускает один элемент из источника, но, кроме того, мы можем указать значение по умолчанию:

Observable.empty()
.singleOrDefault("Default")
.subscribe(i -> result +=i);
assertTrue(result.equals("Default"));

Но если источник Observable генерирует более одного элемента, он все равно выдает исключение IllegalArgumentExeption:

Observable.range(1, 3)
.singleOrDefault(5)
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);

Простой вывод:

  • Если ожидается, что исходный Observable может не иметь ни одного элемента или иметь один элемент, то следует использовать SingleOrDefault.
  • Если мы имеем дело с потенциально более чем одним элементом, испускаемым в нашем Наблюдаемый и мы хотим выдать либо первое, либо последнее значение, мы можем использовать другие операторы, такие как first или last

5. Отметка времени

Оператор Timestamp прикрепляет метку времени к каждому элементу, испускаемому исходным Observable , перед тем, как повторно передать этот элемент в своей собственной последовательности. Временная метка указывает, в какое время элемент был выпущен:

Observable.range(1, 10)
.timestamp()
.map(o -> result = o.getClass().toString() )
.last()
.subscribe();

assertTrue(result.equals("class rx.schedulers.Timestamped"));

6. Задержка

Этот оператор изменяет свой исходный Observable , делая паузу в течение определенного приращения времени перед испусканием каждого из элементов исходного Observable .

Он смещает всю последовательность, используя предоставленное значение:

Observable source = Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.timestamp();

Observable delayedObservable
= source.delay(2, TimeUnit.SECONDS);

source.subscribe(
value -> System.out.println("source :" + value),
t -> System.out.println("source error"),
() -> System.out.println("source completed"));

delayedObservable.subscribe(
value -> System.out.println("delay : " + value),
t -> System.out.println("delay error"),
() -> System.out.println("delay completed"));
Thread.sleep(8000);

Существует альтернативный оператор, с помощью которого мы можем отложить подписку на исходный Observable, который называется delaySubscription .

Оператор Delay запускается в планировщике вычислений по умолчанию, но мы можем выбрать другой планировщик , передав его в качестве необязательного третьего параметра в delaySubscription . ``

7. Повторить

Repeat просто перехватывает уведомление о завершении от восходящего потока и вместо того, чтобы передавать его нисходящему потоку, повторно подписывается.

Следовательно, не гарантируется, что повтор будет повторять одну и ту же последовательность событий, но это происходит, когда восходящий поток является фиксированным потоком:

Observable.range(1, 3)
.repeat(3)
.subscribe(i -> receivedTotal += i);

assertTrue(receivedTotal == 18);

8. Кэш

Оператор кеша стоит между подпиской и нашим кастомным Observable .

Когда появляется первый подписчик, кеш делегирует подписку базовому Observable и пересылает все уведомления (события, завершения или ошибки) вниз по течению.

Однако в то же время он сохраняет копию всех уведомлений внутри. Когда последующий подписчик хочет получать push-уведомления, кеш больше не делегирует базовый Observable , а вместо этого передает кешированные значения:

Observable<Integer> source =
Observable.<Integer>create(subscriber -> {
System.out.println("Create");
subscriber.onNext(receivedTotal += 5);
subscriber.onCompleted();
}).cache();
source.subscribe(i -> {
System.out.println("element 1");
receivedTotal += 1;
});
source.subscribe(i -> {
System.out.println("element 2");
receivedTotal += 2;
});

assertTrue(receivedTotal == 8);

9. Использование

Когда наблюдатель подписывается на Observable , возвращенный из using() , он будет использовать фабричную функцию Observable для создания Observable , который наблюдатель будет… наблюдать, в то же время используя фабричную функцию ресурсов для создания любого ресурса, который мы разработали. это сделать.

Когда наблюдатель отписывается от Observable или когда Observable завершается, использование вызовет третью функцию для удаления созданного ресурса:

Observable<Character> values = Observable.using(
() -> "resource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result += v,
e -> result += e
);
assertTrue(result.equals("resource"));

10. Заключение

В этой статье мы рассказали, как использовать служебные операторы RxJava, а также как изучить их наиболее важные функции.

Истинная сила RxJava заключается в ее операторах. Декларативные преобразования потоков данных безопасны, но выразительны и гибки.

Имея прочную основу в функциональном программировании, операторы играют решающую роль в принятии RxJava. Освоение встроенных операторов — ключ к успеху в этой библиотеке.

Полный исходный код проекта, включая все используемые здесь примеры кода, можно найти на GitHub .