1. Обзор
В этом уроке мы узнаем о хуках RxJava . Мы создадим короткие примеры, чтобы продемонстрировать, как хуки работают в разных ситуациях.
2. Что такое хуки RxJava?
Как следует из названия, хуки RxJava позволяют нам подключаться к жизненному циклу Observable, Completable , Maybe , Flowable
и Single .
Кроме того, RxJava позволяет нам добавлять хуки жизненного цикла к планировщикам, возвращаемым планировщиками.
Кроме того, мы можем указать глобальный обработчик ошибок, также используя хуки.
В RxJava 1 для определения хуков используется класс RxJavaHooks .
Но механизм перехвата полностью переписан в RxJava 2. Теперь класс RxJavaHooks
больше не доступен для определения перехватчиков. Вместо этого мы должны использовать RxJavaPlugins
для реализации хуков жизненного цикла .
Класс RxJavaPlugins
имеет несколько методов установки для установки ловушек. Эти крючки являются глобальными. После того, как они установлены, мы должны либо вызвать метод reset()
класса RxJavaPlugins
, либо вызвать метод установки для отдельного хука, чтобы удалить его.
3. Хук для обработки ошибок
Мы можем использовать метод setErrorHandler()
для обработки ошибок, которые не могут быть сгенерированы, поскольку жизненный цикл нисходящего потока уже достиг конечного состояния. Давайте посмотрим, как мы можем реализовать обработчик ошибок и протестируем его:
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException()).subscribe();
assertTrue(hookCalled);
Не все исключения выбрасываются как есть. Однако RxJava проверит, является ли выданная ошибка одним из уже названных случаев ошибки, которые должны пройти как есть, в противном случае она будет заключена в UndeliverableException
. Исключения, называемые случаями ошибок:
OnErrorNotImplementedException
— когда пользователь забывает добавить обработчикonError
в методsubscribe()
MissingBackpressureException —
либо из-за ошибки оператора, либо из-за одновременногоonNext
IllegalStateException —
когда происходят общие нарушения протоколаNullPointerException —
стандартное исключение нулевого указателяIllegalArgumentException —
из-за недопустимого пользовательского вводаCompositeException —
из-за сбоя при обработке исключения
4. Крючки для Completable
RxJava Completable
имеет два хука жизненного цикла. Давайте посмотрим на них сейчас.
4.1. setOnCompletableAssembly
RxJava будет вызывать этот хук при создании экземпляров операторов и источников в Completable
. Мы можем использовать текущий объект Completable
, предоставленный в качестве аргумента функции ловушки, для любой операции над ним:
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
4.2. setOnCompletableSubscribe
RxJava вызывает этот хук до того, как подписчик подпишется на Completable
:
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1)).test();
assertTrue(hookCalled);
5. Хуки для Observable
Далее давайте взглянем на три крючка жизненного цикла RxJava для Observable
.
5.1. setOnObservableAssembly
RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Observable
:
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
5.2. setOnObservableSubscribe
RxJava вызывает этот хук до того, как подписчик подпишется на Observable
:
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10).test();
assertTrue(hookCalled);
5.3. setOnConnectableObservableAssembly
Этот хук предназначен для ConnectableObservable
. ConnectableObservable — это
вариант самого Observable
. Единственное отличие состоит в том, что он не начинает генерировать элементы, когда на него подписаны, а только когда вызывается его метод connect() :
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10).publish().connect();
assertTrue(hookCalled);
6. Крючки для Flowable
Теперь давайте взглянем на хуки жизненного цикла, определенные для Flowable
.
6.1. setOnFlowableAssembly
RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Flowable
:
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
6.2. setOnFlowableSubscribe
RxJava вызывает этот хук до того, как подписчик подпишется на Flowable
:
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10).test();
assertTrue(hookCalled);
6.3. setOnConnectableFlowableAssembly
RxJava вызывает этот хук, когда создает экземпляры операторов и источников в ConnectableFlowable
. Как и ConnectableObservable
, ConnectableFlowable
также начинает выдавать элементы только тогда, когда мы вызываем его метод connect() :
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10).publish().connect();
assertTrue(hookCalled);
6.4. setOnParallelAssembly
ParallelFlowable предназначен для достижения параллелизма между несколькими издателями .
RxJava вызывает хук setOnParallelAssembly()
, когда создает экземпляры операторов и источников в ParallelFlowable
:
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10).parallel();
assertTrue(hookCalled);
7. Крючки для « Может быть»
Эмиттер Maybe
имеет два хука, определенных для управления его жизненным циклом.
7.1. setOnMaybeAssembly
RxJava вызывает этот хук, когда создает экземпляры операторов и источников на Maybe
:
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
7.2. setOnMaybeSubscribe
RxJava вызывает этот хук до того, как подписчик подпишется на Maybe
:
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1).test();
assertTrue(hookCalled);
8. Крючки для сингла
RxJava также определяет два основных хука для одного
эмиттера.
8.1. setOnSingleAssembly
RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Single
:
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
8.2. setOnSingleSubscribe
RxJava вызывает этот хук до того, как подписчик подпишется на Single
:
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1).test();
assertTrue(hookCalled);
9. Хуки для планировщиков
Как и эмиттеры RxJava, планировщики
также имеют множество хуков для управления их жизненным циклом. RxJava определяет общий хук, который вызывается, когда мы используем планировщик любого типа.
Кроме того, можно реализовать хуки, характерные для различных планировщиков
.
9.1. setScheduleHandler
RxJava вызывает этот хук, когда мы используем любой из планировщиков для операции:
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
Поскольку мы повторили операцию как с планировщиками single()
, так и с планировщиком вычислений()
, когда мы запустим это, тестовый пример дважды выведет сообщение в консоль.
9.2. Хуки для планировщика вычислений
Планировщик вычислений имеет два хука, а именно setInitComputationSchedulerHandler
и setComputationSchedulerHandler.
Когда RxJava инициализирует планировщик вычислений, он вызывает ловушку, которую мы установили с помощью функции setInitComputationSchedulerHandler
. И, кроме того, он вызывает хук, который мы установили с помощью функции setComputationSchedulerHandler,
когда мы планируем задачу с помощью Schedulers.computation()
:
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
9.3. Хуки для IO
Scheduler
Планировщик ввода
-вывода также имеет два хука, а именно setInitIoSchedulerHandler
и setIoSchedulerHandler
:
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
9.4. Хуки для одного
планировщика
Теперь давайте посмотрим на хуки для одиночного
планировщика:
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
9.5. Хуки для NewThread
Scheduler
Как и другие планировщики, планировщик NewThread
также определяет два хука:
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
10. Заключение
В этом руководстве мы узнали, что такое различные хуки жизненного цикла RxJava и как мы можем их реализовать. Среди этих хуков наиболее примечательным является хук обработки ошибок. Но мы можем использовать другие для целей аудита, например, для регистрации количества подписчиков и других конкретных случаев использования.
И, как обычно, все короткие примеры, которые мы здесь обсуждали, можно найти на GitHub .