Перейти к основному содержимому

Введение в RxJava

· 8 мин. чтения

1. Обзор

В этой статье мы сосредоточимся на использовании Reactive Extensions (Rx) в Java для создания и использования последовательностей данных.

На первый взгляд API может показаться похожим на Java 8 Streams, но на самом деле он гораздо более гибкий и плавный, что делает его мощной парадигмой программирования.

Если вы хотите узнать больше о RxJava, ознакомьтесь с этой статьей .

2. Настройка

Чтобы использовать RxJava в нашем проекте Maven , нам нужно добавить следующую зависимость в наш pom.xml:

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>

Или для проекта Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Функциональные реактивные концепции

С одной стороны, функциональное программирование — это процесс создания программного обеспечения путем создания чистых функций, избегая общего состояния, изменяемых данных и побочных эффектов.

С другой стороны, реактивное программирование — это парадигма асинхронного программирования, связанная с потоками данных и распространением изменений.

Вместе функциональное реактивное программирование образует комбинацию функциональных и реактивных методов, которые могут представлять собой элегантный подход к программированию, управляемому событиями, со значениями, которые меняются со временем, и где потребитель реагирует на данные по мере их поступления.

Эта технология объединяет различные реализации ее основных принципов, некоторые авторы придумали документ, определяющий общий словарь для описания приложений нового типа.

3.1. Реактивный манифест

Reactive Manifesto — это онлайн-документ, в котором излагаются высокие стандарты для приложений в индустрии разработки программного обеспечения. Проще говоря, реактивные системы:

  • Отзывчивость – системы должны реагировать своевременно
  • Управление сообщениями — системы должны использовать асинхронную передачу сообщений между компонентами, чтобы обеспечить слабую связь.
  • Эластичность — системы должны оставаться отзывчивыми при высокой нагрузке.
  • Отказоустойчивость — системы должны оставаться отзывчивыми даже при выходе из строя некоторых компонентов.

4. Наблюдаемые

При работе с Rx необходимо понимать два ключевых типа:

  • Observable представляет собой любой объект, который может получать данные из источника данных и состояние которого может представлять интерес таким образом, что другие объекты могут зарегистрировать интерес.
  • Наблюдатель — это любой объект, который хочет получать уведомления об изменении состояния другого объекта.

Наблюдатель подписывается на наблюдаемую последовательность. Последовательность отправляет элементы наблюдателю по одному.

Наблюдатель обрабатывает каждую перед обработкой следующей. Если многие события приходят асинхронно, они должны быть сохранены в очереди или удалены.

В Rx наблюдатель никогда не будет вызываться с элементом не по порядку или вызываться до того, как обратный вызов вернется для предыдущего элемента.

4.1. Типы наблюдаемых

Есть два типа:

  • Неблокирующий — поддерживается асинхронное выполнение, и можно отказаться от подписки в любой момент потока событий. В этой статье мы сосредоточимся в основном на этом типе
  • Блокировка — все вызовы наблюдателя onNext будут синхронными, и отписаться в середине потока событий невозможно. Мы всегда можем преобразовать Observable в Blocking Observable , используя метод toBlocking:
BlockingObservable<String> blockingObservable = observable.toBlocking();

4.2. Операторы

Оператор — это функция, которая принимает одну наблюдаемую O (источник) в качестве первого аргумента и возвращает другую наблюдаемую (пункт назначения). Затем для каждого элемента, который испускает наблюдаемый источник, он применяет функцию к этому элементу, а затем выдает результат в наблюдаемом объекте назначения .

Операторы могут быть объединены в цепочку для создания сложных потоков данных, фильтрующих события на основе определенных критериев. К одному и тому же наблюдаемому можно применять несколько операторов .

Нетрудно попасть в ситуацию, в которой Observable генерирует элементы быстрее, чем оператор или наблюдатель может их использовать. Подробнее об обратном давлении можно прочитать здесь.

4.3. Создать наблюдаемый

Базовый оператор просто создает Observable , который выдает один общий экземпляр перед завершением, строку «Hello». Когда мы хотим получить информацию из Observable , мы реализуем интерфейс наблюдателя , а затем вызываем subscribe для нужного Observable:

Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);

assertTrue(result.equals("Hello"));

4.4. OnNext, OnError и OnCompleted

В интерфейсе наблюдателя есть три метода, о которых мы хотим знать:

  1. OnNext вызывается у нашего наблюдателя каждый раз, когда новое событие публикуется в прикрепленном Observable . Это метод, в котором мы будем выполнять некоторые действия для каждого события.
  2. OnCompleted вызывается, когда последовательность событий, связанных с Observable , завершена, что указывает на то, что мы не должны больше ожидать вызовов onNext для нашего наблюдателя .
  3. OnError вызывается, когда возникает необработанное исключение во время кода фреймворка RxJava или нашего кода обработки событий .

Возвращаемое значение для метода подписки Observables — это интерфейс подписки :

String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));

5. Наблюдаемые преобразования и условные операторы

5.1. карта

Оператор карты преобразует элементы, испускаемые Observable , применяя функцию к каждому элементу.

Предположим, что есть объявленный массив строк, который содержит некоторые буквы алфавита, и мы хотим напечатать их в режиме заглавных букв:

Observable.from(letters)
.map(String::toUpperCase)
.subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));

FlatMap можно использовать для выравнивания Observables всякий раз, когда мы получаем вложенные Observables.

Подробнее о разнице между map и flatMap можно прочитать здесь .

Предположим, у нас есть метод, который возвращает Observable<String> из списка строк. Теперь мы будем печатать для каждой строки из нового Observable список заголовков на основе того, что видит подписчик :

Observable<String> getTitle() {
return Observable.from(titleList);
}
Observable.just("book1", "book2")
.flatMap(s -> getTitle())
.subscribe(l -> result += l);

assertTrue(result.equals("titletitle"));

5.2. Сканировать

Оператор сканирования последовательно применяет функцию к каждому элементу, испускаемому Observable , и выдает каждое последующее значение.

Это позволяет нам переносить состояние от события к событию:

String[] letters = {"a", "b", "c"};
Observable.from(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));

5.3. Группа по

Группировка по оператору позволяет нам классифицировать события во входном Observable по выходным категориям.

Предположим, что мы создали массив целых чисел от 0 до 10, затем применим группировку, которая разделит их на категории четные и нечетные :

Observable.from(numbers)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe(group ->
group.subscribe((number) -> {
if (group.getKey().toString().equals("EVEN")) {
EVEN[0] += number;
} else {
ODD[0] += number;
}
})
);
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));

5.4. Фильтр

Оператор filter испускает только те элементы наблюдаемого объекта, которые проходят предикатную проверку.

Итак, давайте отфильтруем массив целых чисел для нечетных чисел:

Observable.from(numbers)
.filter(i -> (i % 2 == 1))
.subscribe(i -> result += i);

assertTrue(result.equals("13579"));

5.5. Условные операторы

DefaultIfEmpty выдает элемент из исходного Observable или элемент по умолчанию, если исходный Observable пуст:

Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(s -> result += s);

assertTrue(result.equals("Observable is empty"));

Следующий код выдает первую букву алфавита ' a', потому что массив букв не пуст, и это то, что он содержит в первой позиции:

Observable.from(letters)
.defaultIfEmpty("Observable is empty")
.first()
.subscribe(s -> result += s);

assertTrue(result.equals("a"));

Оператор TakeWhile отбрасывает элементы, испускаемые Observable после того, как указанное условие становится ложным:

Observable.from(numbers)
.takeWhile(i -> i < 5)
.subscribe(s -> sum[0] += s);

assertTrue(sum[0] == 10);

Конечно, есть и другие операторы, которые могли бы удовлетворить наши потребности, такие как Contain, SkipWhile, SkipUntil, TakeUntil и т. д.

6. Подключаемые наблюдаемые

ConnectableObservable похож на обычный Observable , за исключением того, что он не начинает испускать элементы, когда на него подписаны, а только когда к нему применяется оператор соединения .

Таким образом, мы можем дождаться, пока все предполагаемые наблюдатели подпишутся на Observable , прежде чем Observable начнет генерировать элементы:

String[] result = {""};
ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01"));

connectable.connect();
Thread.sleep(500);

assertTrue(result[0].equals("01"));

7. Одноместный

Single похож на Observable , который вместо серии значений выдает одно значение или уведомление об ошибке.

С этим источником данных мы можем использовать только два метода для подписки:

  • OnSuccess возвращает Single , который также вызывает указанный нами метод.
  • OnError также возвращает Single , который немедленно уведомляет подписчиков об ошибке.
String[] result = {""};
Single<String> single = Observable.just("Hello")
.toSingle()
.doOnSuccess(i -> result[0] += i)
.doOnError(error -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe();

assertTrue(result[0].equals("Hello"));

8. Субъекты

Субъект — это одновременно два элемента: подписчик и наблюдаемый объект . В качестве подписчика субъект может использоваться для публикации событий, происходящих из более чем одного наблюдаемого.

И поскольку это также можно наблюдать, события от нескольких подписчиков могут быть повторно отправлены как его события любому, кто его наблюдает.

В следующем примере мы рассмотрим, как наблюдатели смогут видеть события, происходящие после подписки:

Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber1 += value;
}

@Override
public void onError(Throwable e) {
System.out.println("error");
}

@Override
public void onCompleted() {
System.out.println("Subscriber1 completed");
}
};
}

Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber2 += value;
}

@Override
public void onError(Throwable e) {
System.out.println("error");
}

@Override
public void onCompleted() {
System.out.println("Subscriber2 completed");
}
};
}

PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(getSecondObserver());
subject.onNext(4);
subject.onCompleted();

assertTrue(subscriber1 + subscriber2 == 14)

9. Управление ресурсами

Использование операции позволяет нам связывать ресурсы, такие как соединение с базой данных JDBC, сетевое соединение или открытые файлы, с нашими наблюдаемыми.

Здесь мы представляем в комментариях шаги, которые нам нужно сделать для достижения этой цели, а также пример реализации:

String[] result = {""};
Observable<Character> values = Observable.using(
() -> "MyResource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result[0] += v,
e -> result[0] += e
);
assertTrue(result[0].equals("MyResource"));

10. Заключение

В этой статье мы рассказали, как использовать библиотеку RxJava, а также как изучить ее наиболее важные функции.

Полный исходный код проекта, включая все используемые здесь примеры кода, можно найти на Github .