Перейти к основному содержимому

Хуки RxJava

· 7 мин. чтения

1. Обзор

В этом уроке мы узнаем о хуках RxJava . Мы создадим короткие примеры, чтобы продемонстрировать, как хуки работают в разных ситуациях.

2. Что такое хуки RxJava?

Как следует из названия, хуки RxJava позволяют нам подключаться к жизненному циклу Observable, Completable , Maybe , Flowable и Single . Кроме того, RxJava позволяет нам добавлять хуки жизненного цикла к планировщикам, возвращаемым планировщиками. Кроме того, мы можем указать глобальный обработчик ошибок, также используя хуки.

В RxJava 1 для определения хуков используется класс RxJavaHooks . Но механизм перехвата полностью переписан в RxJava 2. Теперь класс RxJavaHooks больше не доступен для определения перехватчиков. Вместо этого мы должны использовать RxJavaPlugins для реализации хуков жизненного цикла .

Класс RxJavaPlugins имеет несколько методов установки для установки ловушек. Эти крючки являются глобальными. После того, как они установлены, мы должны либо вызвать метод reset() класса RxJavaPlugins , либо вызвать метод установки для отдельного хука, чтобы удалить его.

3. Хук для обработки ошибок

Мы можем использовать метод setErrorHandler() для обработки ошибок, которые не могут быть сгенерированы, поскольку жизненный цикл нисходящего потока уже достиг конечного состояния. Давайте посмотрим, как мы можем реализовать обработчик ошибок и протестируем его:

RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);

Не все исключения выбрасываются как есть. Однако RxJava проверит, является ли выданная ошибка одним из уже названных случаев ошибки, которые должны пройти как есть, в противном случае она будет заключена в UndeliverableException . Исключения, называемые случаями ошибок:

  • OnErrorNotImplementedException — когда пользователь забывает добавить обработчик onError в метод subscribe()
  • MissingBackpressureException — либо из-за ошибки оператора, либо из-за одновременного onNext
  • IllegalStateException — когда происходят общие нарушения протокола
  • NullPointerException — стандартное исключение нулевого указателя
  • IllegalArgumentException — из-за недопустимого пользовательского ввода
  • CompositeException — из-за сбоя при обработке исключения

4. Крючки для Completable

RxJava Completable имеет два хука жизненного цикла. Давайте посмотрим на них сейчас.

4.1. setOnCompletableAssembly

RxJava будет вызывать этот хук при создании экземпляров операторов и источников в Completable . Мы можем использовать текущий объект Completable , предоставленный в качестве аргумента функции ловушки, для любой операции над ним:

RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

4.2. setOnCompletableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Completable :

RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

5. Хуки для Observable

Далее давайте взглянем на три крючка жизненного цикла RxJava для Observable .

5.1. setOnObservableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Observable :

RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

5.2. setOnObservableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Observable :

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

5.3. setOnConnectableObservableAssembly

Этот хук предназначен для ConnectableObservable . ConnectableObservable — это вариант самого Observable . Единственное отличие состоит в том, что он не начинает генерировать элементы, когда на него подписаны, а только когда вызывается его метод connect() :

RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6. Крючки для Flowable

Теперь давайте взглянем на хуки жизненного цикла, определенные для Flowable .

6.1. setOnFlowableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Flowable :

RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

6.2. setOnFlowableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Flowable :

RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

6.3. setOnConnectableFlowableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в ConnectableFlowable . Как и ConnectableObservable , ConnectableFlowable также начинает выдавать элементы только тогда, когда мы вызываем его метод connect() :

RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6.4. setOnParallelAssembly

ParallelFlowable предназначен для достижения параллелизма между несколькими издателями . RxJava вызывает хук setOnParallelAssembly() , когда создает экземпляры операторов и источников в ParallelFlowable :

RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

7. Крючки для « Может быть»

Эмиттер Maybe имеет два хука, определенных для управления его жизненным циклом.

7.1. setOnMaybeAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников на Maybe :

RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

7.2. setOnMaybeSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Maybe :

RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

8. Крючки для сингла

RxJava также определяет два основных хука для одного эмиттера.

8.1. setOnSingleAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Single :

RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});

Single.just(1);

assertTrue(hookCalled);

8.2. setOnSingleSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Single :

RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

9. Хуки для планировщиков

Как и эмиттеры RxJava, планировщики также имеют множество хуков для управления их жизненным циклом. RxJava определяет общий хук, который вызывается, когда мы используем планировщик любого типа. Кроме того, можно реализовать хуки, характерные для различных планировщиков .

9.1. setScheduleHandler

RxJava вызывает этот хук, когда мы используем любой из планировщиков для операции:

RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});

Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();

hookCalled = false;

Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();

assertTrue(hookCalled);

Поскольку мы повторили операцию как с планировщиками single() , так и с планировщиком вычислений() , когда мы запустим это, тестовый пример дважды выведет сообщение в консоль.

9.2. Хуки для планировщика вычислений

Планировщик вычислений имеет два хука, а именно setInitComputationSchedulerHandler и setComputationSchedulerHandler.

Когда RxJava инициализирует планировщик вычислений, он вызывает ловушку, которую мы установили с помощью функции setInitComputationSchedulerHandler . И, кроме того, он вызывает хук, который мы установили с помощью функции setComputationSchedulerHandler, когда мы планируем задачу с помощью Schedulers.computation() :

RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});

Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();

assertTrue(hookCalled && initHookCalled);

9.3. Хуки для IO Scheduler

Планировщик ввода -вывода также имеет два хука, а именно setInitIoSchedulerHandler и setIoSchedulerHandler :

RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});

Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();

assertTrue(hookCalled && initHookCalled);

9.4. Хуки для одного планировщика

Теперь давайте посмотрим на хуки для одиночного планировщика:

RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});

Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();

assertTrue(hookCalled && initHookCalled);

9.5. Хуки для NewThread Scheduler

Как и другие планировщики, планировщик NewThread также определяет два хука:

RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});

Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();

assertTrue(hookCalled && initHookCalled);

10. Заключение

В этом руководстве мы узнали, что такое различные хуки жизненного цикла RxJava и как мы можем их реализовать. Среди этих хуков наиболее примечательным является хук обработки ошибок. Но мы можем использовать другие для целей аудита, например, для регистрации количества подписчиков и других конкретных случаев использования.

И, как обычно, все короткие примеры, которые мы здесь обсуждали, можно найти на GitHub .