Перейти к основному содержимому

RxJava One Observable, несколько подписчиков

· 9 мин. чтения

1. Обзор

Поведение нескольких подписчиков по умолчанию не всегда желательно. В этой статье мы расскажем, как изменить это поведение и правильно обрабатывать несколько подписчиков.

Но сначала давайте посмотрим на поведение нескольких подписчиков по умолчанию.

2. Поведение по умолчанию

Допустим, у нас есть следующий Observable :

private static Observable getObservable() {
return Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));

subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
});
}

Это испускает два элемента, как только Subscriber s подписывается.

В нашем примере у нас есть два Subscriber s:

LOGGER.info("Subscribing");

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

Представьте, что получение каждого элемента — затратная операция — она может включать, например, интенсивные вычисления или открытие URL-соединения.

Для простоты мы просто вернем число:

private static Integer gettingValue(int i) {
LOGGER.info("Getting " + i);
return i;
}

Вот результат:

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

Как мы видим , получение каждого элемента, а также очистка ресурсов по умолчанию выполняется дважды — один раз для каждого Subscriber . Это не то, чего мы хотим. Класс ConnectableObservable помогает решить проблему.

3. ConnectableObservable

Класс ConnectableObservable позволяет разделить подписку с несколькими подписчиками и не выполнять базовые операции несколько раз.

Но сначала давайте создадим ConnectableObservable .

3.1. публиковать()

Метод publish() — это то, что создает ConnectableObservable из Observable :

ConnectableObservable obs = Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
}).publish();

Но пока это ничего не дает. Что заставляет его работать, так это метод connect() .

3.2. соединять()

Пока не будет вызван метод connect() ConnectableObservable , обратный вызов onSubscribe () Observable не запускается, `` даже если есть несколько подписчиков.

Давайте продемонстрируем это:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

Мы подписываемся, а затем ждем секунду перед подключением. Результат:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

Как мы можем видеть:

  • Получение элементов происходит только один раз, как мы и хотели

  • Очистка ресурсов также происходит только один раз

  • Получение элементов начинается через секунду после подписки.

  • Подписка больше не вызывает испускание элементов. Только connect() делает это

Эта задержка может быть полезной — иногда нам нужно дать всем подписчикам одинаковую последовательность элементов, даже если один из них подписывается раньше, чем другой.

3.3. Непротиворечивое представление наблюдаемых — connect() после подписки()

Этот вариант использования не может быть продемонстрирован на нашем предыдущем Observable , так как он работает холодно, и оба подписчика все равно получают всю последовательность элементов.

Вместо этого представьте, что генерация элемента не зависит от момента подписки, например, события, генерируемые щелчком мыши. Теперь также представьте, что второй подписчик подписывается через секунду после первого.

Первый подписчик получит все элементы, созданные в этом примере, тогда как второй подписчик получит только некоторые элементы.

С другой стороны, использование метода connect() в нужном месте может дать обоим подписчикам одинаковое представление об наблюдаемой последовательности.

Пример горячего наблюдения

Давайте создадим горячий Observable . Он будет испускать элементы при щелчке мышью на JFrame .

Каждый элемент будет координатой x клика:

private static Observable getObservable() {
return Observable.create(subscriber -> {
frame.addMouseListener(new MouseAdapter() {
@Override
public void mouseClicked(MouseEvent e) {
subscriber.onNext(e.getX());
}
});
subscriber.add(Subscriptions.create(() {
LOGGER.info("Clear resources");
for (MouseListener listener : frame.getListeners(MouseListener.class)) {
frame.removeMouseListener(listener);
}
}));
});
}

Поведение Hot Observable по умолчанию

Теперь если мы подпишем два Subscriber’а друг за другом с интервалом в секунду, запустим программу и начнем кликать, мы увидим, что первый Subscriber получит больше элементов:

public static void defaultBehaviour() throws InterruptedException {
Observable obs = getObservable();

LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe((i) ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe((i) ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("unsubscribe#1");
subscription1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe#2");
subscription2.unsubscribe();
}
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources

connect() После подписки()

Чтобы оба подписчика получали одинаковую последовательность, мы преобразуем этот Observable в ConnectableObservable и вызовем connect() после подписки обоих Subscriber s:

public static void subscribeBeforeConnect() throws InterruptedException {

ConnectableObservable obs = getObservable().publish();

LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(
i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(
i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("connecting:");
Subscription s = obs.connect();
Thread.sleep(1000);
LOGGER.info("unsubscribe connected");
s.unsubscribe();
}

Теперь они получат ту же последовательность:

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

Так что суть в том, чтобы дождаться момента, когда все подписчики будут готовы, а затем вызвать connect() .

В приложении Spring мы можем подписаться на все компоненты во время запуска приложения, например, и вызвать connect() в onApplicationEvent() .

Но вернемся к нашему примеру; обратите внимание, что все клики перед методом connect() пропускаются. Если мы не хотим пропускать элементы, а наоборот обрабатывать их, мы можем поставить connect() раньше в коде и заставить Observable генерировать события в отсутствие какого-либо Subscriber .

3.4. Принудительная подписка в отсутствие какого-либо подписчикаconnect() до subscribe()

Чтобы продемонстрировать это, давайте исправим наш пример:

public static void connectBeforeSubscribe() throws InterruptedException {
ConnectableObservable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish();
LOGGER.info("connecting:");
Subscription s = obs.connect();
Thread.sleep(1000);
LOGGER.info("subscribing #1");
obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
s.unsubscribe();
}

Шаги относительно просты:

  • Сначала мы подключаем
  • Затем ждем одну секунду и подписываемся на первого подписчика
  • Наконец, мы ждем еще секунду и подписываемся на второго подписчика .

Обратите внимание, что мы добавили оператор doOnNext() . Здесь мы могли бы, например, хранить элементы в базе данных, но в нашем коде мы просто печатаем «сохранение…».

Если мы запустим код и начнем щелкать, мы увидим, что элементы создаются и обрабатываются сразу после вызова connect() :

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

Если бы не было подписчиков, элементы все равно обрабатывались бы.

Таким образом, метод connect() начинает испускать и обрабатывать элементы независимо от того, подписан ли кто-то, как если бы существовал искусственный подписчик с пустым действием, которое использовало элементы.

И если подписываются какие-то настоящие Subscribers , то этот искусственный посредник просто распространяет им элементы.

Для отписки искусственного Подписчика мы выполняем:

s.unsubscribe();

Где:

Subscription s = obs.connect();

3.5. автоподключение()

Этот метод подразумевает, что connect() вызывается не до или после подписки, а автоматически, когда подписывается первый подписчик .

Используя этот метод, мы не можем сами вызвать connect() , так как возвращаемый объект является обычным Observable , который не имеет этого метода, но использует базовый ConnectableObservable :

public static void autoConnectAndSubscribe() throws InterruptedException {
Observable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();

LOGGER.info("autoconnect()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription s1 = obs.subscribe((i) ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription s2 = obs.subscribe((i) ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));

Thread.sleep(1000);
LOGGER.info("unsubscribe 1");
s1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe 2");
s2.unsubscribe();
}

Обратите внимание, что мы также не можем отменить подписку на искусственного Subscriber . Мы можем отписать всех настоящих подписчиков , но искусственный подписчик все равно будет обрабатывать события.

Чтобы понять это, давайте посмотрим, что происходит в конце после того, как последний подписчик отписался:

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

Как мы видим, очистки ресурсов не происходит, а сохранение элементов с помощью doOnNext() продолжается после второй отписки. Это означает, что искусственный подписчик не отписывается, а продолжает потреблять элементы.

3.6. количество ссылок()

refCount() аналогичен autoConnect() в том смысле, что соединение также происходит автоматически, как только подписывается первый подписчик .

В отличие от autoconnect() отключение также происходит автоматически, когда последний подписчик отписывается:

public static void refCountAndSubscribe() throws InterruptedException {
Observable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();

LOGGER.info("refcount()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(
i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(
i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));

Thread.sleep(1000);
LOGGER.info("unsubscribe#1");
subscription1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe#2");
subscription2.unsubscribe();
}
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources

4. Вывод

Класс ConnectableObservable помогает без особых усилий обрабатывать несколько подписчиков.

Его методы выглядят одинаково, но сильно меняют поведение подписчиков из-за тонкостей реализации, то есть даже порядок методов имеет значение.

Полный исходный код всех примеров, использованных в этой статье, можно найти в проекте GitHub .