1. Введение
Популярность RxJava привела к созданию множества сторонних библиотек, расширяющих его функциональность.
Многие из этих библиотек были ответом на типичные проблемы, с которыми сталкивались разработчики при использовании RxJava. RxRelay — одно из таких решений.
2. Работа с предметом
Проще говоря, Subject
действует как мост между Observable
и Observer.
Поскольку это Observer
, он может подписаться на один или несколько Observable
и получать от них события.
Кроме того, учитывая, что он одновременно является Observable
, он может повторно отправлять события или отправлять новые события своим подписчикам. Более подробную информацию о Теме
можно найти в этой статье .
Одна из проблем с Subject
заключается в том, что после получения onComplete()
или onError()
он больше не может перемещать данные. Иногда это желаемое поведение, а иногда нет.
В тех случаях, когда такое поведение нежелательно, следует рассмотреть возможность использования RxRelay
.
3. Реле
Relay
— это , по сути, Subject
, но без возможности вызова onComplete()
и onError(),
поэтому он может постоянно выдавать данные.
Это позволяет нам создавать мосты между различными типами API, не беспокоясь о случайном срабатывании состояния терминала.
Чтобы использовать RxRelay
, нам нужно добавить в наш проект следующую зависимость:
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>1.2.0</version>
</dependency>
4. Типы реле
В библиотеке доступно три различных типа реле .
Мы быстро рассмотрим все три здесь.
4.1. Публикациярелея
Ретранслятор
этого типа будет повторно передавать все события после того, как наблюдатель
подпишется на него.
События будут отправлены всем подписчикам:
public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
// second receives only the last event
secondObserver.assertValue(15);
}
В этом случае нет буферизации событий, поэтому такое поведение похоже на холодный Observable.
4.2. ПоведениеRelay
Ретранслятор
этого типа будет повторно передавать самое последнее наблюдаемое событие и все последующие события после того, как наблюдатель
подпишется:
public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}
Когда мы создаем BehaviorRelay
, мы можем указать значение по умолчанию, которое будет генерироваться, если нет других событий для генерирования.
Чтобы указать значение по умолчанию, мы можем использовать метод createDefault()
:
public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}
Если мы не хотим указывать значение по умолчанию, мы можем использовать метод create()
:
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
4.3. ReplayRelay
Этот тип Relay
буферизует все полученные им события, а затем повторно передает их всем подписчикам, подписавшимся на него:
public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}
Все элементы буферизуются и все подписчики будут получать одни и те же события, поэтому такое поведение похоже на холодный Observable
.
Когда мы создаем ReplayRelay
, мы можем предоставить максимальный размер буфера и время жизни для событий.
Чтобы создать Relay
с ограниченным размером буфера, мы можем использовать метод createWithSize()
. Когда количество событий для буферизации превышает установленный размер буфера, предыдущие элементы будут отброшены:
public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}
Мы также можем создать ReplayRelay
с максимальным временем выхода для буферизованных событий, используя метод createWithTime()
:
public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
SingleScheduler scheduler = new SingleScheduler();
ReplayRelay<Integer> replayRelay =
ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
long current = scheduler.now(TimeUnit.MILLISECONDS);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
Thread.sleep(3000);
replayRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
5. Пользовательское реле
Все типы, описанные выше, расширяют общий абстрактный класс Relay,
что дает нам возможность написать собственный пользовательский класс Relay .
Чтобы создать собственное реле
, нам нужно реализовать три метода: accept(), hasObservers()
и subscribeActual().
Давайте напишем простую ретрансляцию, которая будет пересылать событие одному из случайно выбранных подписчиков:
public class RandomRelay extends Relay<Integer> {
Random random = new Random();
List<Observer<? super Integer>> observers = new ArrayList<>();
@Override
public void accept(Integer integer) {
int observerIndex = random.nextInt() % observers.size();
observers.get(observerIndex).onNext(integer);
}
@Override
public boolean hasObservers() {
return observers.isEmpty();
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(
() -> System.out.println("Disposed")));
}
}
Теперь мы можем проверить, что только один подписчик получит событие:
public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
if(firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}
6. Заключение
В этом уроке мы рассмотрели RxRelay, тип, похожий на Subject
, но без возможности активировать терминальное состояние.
Более подробную информацию можно найти в документации . И, как всегда, все примеры кода можно найти на GitHub .