Перейти к основному содержимому

Введение в RxRelay для RxJava

· 5 мин. чтения

1. Введение

Популярность RxJava привела к созданию множества сторонних библиотек, расширяющих его функциональность.

Многие из этих библиотек были ответом на типичные проблемы, с которыми сталкивались разработчики при использовании RxJava. RxRelay — одно из таких решений.

2. Работа с предметом

Проще говоря, Subject действует как мост между Observable и Observer. Поскольку это Observer , он может подписаться на один или несколько Observable и получать от них события.

Кроме того, учитывая, что он одновременно является Observable , он может повторно отправлять события или отправлять новые события своим подписчикам. Более подробную информацию о Теме можно найти в этой статье .

Одна из проблем с Subject заключается в том, что после получения onComplete() или onError() он больше не может перемещать данные. Иногда это желаемое поведение, а иногда нет.

В тех случаях, когда такое поведение нежелательно, следует рассмотреть возможность использования RxRelay .

3. Реле

Relay — это , по сути, Subject , но без возможности вызова onComplete() и onError(), поэтому он может постоянно выдавать данные.

Это позволяет нам создавать мосты между различными типами API, не беспокоясь о случайном срабатывании состояния терминала.

Чтобы использовать RxRelay , нам нужно добавить в наш проект следующую зависимость:

<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>1.2.0</version>
</dependency>

4. Типы реле

В библиотеке доступно три различных типа реле . Мы быстро рассмотрим все три здесь.

4.1. Публикациярелея

Ретранслятор этого типа будет повторно передавать все события после того, как наблюдатель подпишется на него.

События будут отправлены всем подписчикам:

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();

publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
firstObserver.assertValues(5, 10, 15);

// second receives only the last event
secondObserver.assertValue(15);
}

В этом случае нет буферизации событий, поэтому такое поведение похоже на холодный Observable.

4.2. ПоведениеRelay

Ретранслятор этого типа будет повторно передавать самое последнее наблюдаемое событие и все последующие события после того, как наблюдатель подпишется:

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}

Когда мы создаем BehaviorRelay , мы можем указать значение по умолчанию, которое будет генерироваться, если нет других событий для генерирования.

Чтобы указать значение по умолчанию, мы можем использовать метод createDefault() :

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}

Если мы не хотим указывать значение по умолчанию, мы можем использовать метод create() :

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}

4.3. ReplayRelay

Этот тип Relay буферизует все полученные им события, а затем повторно передает их всем подписчикам, подписавшимся на него:

public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}

Все элементы буферизуются и все подписчики будут получать одни и те же события, поэтому такое поведение похоже на холодный Observable .

Когда мы создаем ReplayRelay , мы можем предоставить максимальный размер буфера и время жизни для событий.

Чтобы создать Relay с ограниченным размером буфера, мы можем использовать метод createWithSize() . Когда количество событий для буферизации превышает установленный размер буфера, предыдущие элементы будут отброшены:

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}

Мы также можем создать ReplayRelay с максимальным временем выхода для буферизованных событий, используя метод createWithTime() :

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
SingleScheduler scheduler = new SingleScheduler();
ReplayRelay<Integer> replayRelay =
ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
long current = scheduler.now(TimeUnit.MILLISECONDS);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
Thread.sleep(3000);
replayRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}

5. Пользовательское реле

Все типы, описанные выше, расширяют общий абстрактный класс Relay, что дает нам возможность написать собственный пользовательский класс Relay .

Чтобы создать собственное реле , нам нужно реализовать три метода: accept(), hasObservers() и subscribeActual().

Давайте напишем простую ретрансляцию, которая будет пересылать событие одному из случайно выбранных подписчиков:

public class RandomRelay extends Relay<Integer> {
Random random = new Random();

List<Observer<? super Integer>> observers = new ArrayList<>();

@Override
public void accept(Integer integer) {
int observerIndex = random.nextInt() % observers.size();
observers.get(observerIndex).onNext(integer);
}

@Override
public boolean hasObservers() {
return observers.isEmpty();
}

@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(
() -> System.out.println("Disposed")));
}
}

Теперь мы можем проверить, что только один подписчик получит событие:

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
if(firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}

6. Заключение

В этом уроке мы рассмотрели RxRelay, тип, похожий на Subject , но без возможности активировать терминальное состояние.

Более подробную информацию можно найти в документации . И, как всегда, все примеры кода можно найти на GitHub .