Перейти к основному содержимому

Преобразование синхронных и асинхронных API в Observables с использованием RxJava2

· 5 мин. чтения

1. Обзор

В этом руководстве мы узнаем , как преобразовать традиционные синхронные и асинхронные API в Observables с помощью операторов RxJava2 .

Мы создадим несколько простых функций, которые помогут нам подробно обсудить эти операторы.

2. Зависимости Maven

Во- первых, мы должны добавить RxJava2 и RxJava2Extensions в качестве зависимостей Maven:

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.4</version>
</dependency>

3. Операторы

RxJava2 определяет множество операторов для различных вариантов использования реактивного программирования.

Но мы обсудим только несколько операторов, которые обычно используются для преобразования синхронных или асинхронных методов в Observable в зависимости от их природы. Эти операторы принимают функции в качестве аргументов и выдают значение, возвращаемое этой функцией .

Наряду с обычными операторами RxJava2 определяет еще несколько операторов для расширенных функций.

Давайте рассмотрим, как мы можем использовать эти операторы для преобразования синхронных и асинхронных методов.

4. Синхронное преобразование метода

4.1. Использование fromCallable()

Этот оператор возвращает Observable , который, когда подписчик подписывается на него, вызывает функцию, переданную в качестве аргумента, а затем выдает значение, возвращаемое этой функцией. Давайте создадим функцию, которая возвращает целое число, и преобразуем ее:

AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();

Теперь давайте превратим его в Observable и протестируем, подписавшись на него:

Observable<Integer> source = Observable.fromCallable(callable);

for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}

Оператор fromCallable() лениво выполняет указанную функцию каждый раз, когда обернутый Observable подписывается. Чтобы проверить это поведение, мы создали несколько подписчиков с помощью цикла.

Поскольку реактивные потоки по умолчанию асинхронны, подписчик немедленно вернется. В большинстве практических сценариев вызываемая функция будет иметь некоторую задержку для завершения своего выполнения. Итак, мы добавили максимальное время ожидания в пять секунд перед проверкой результата нашей вызываемой функции.

Также обратите внимание, что мы использовали метод test() Observable . Этот метод удобен при тестировании Observables . Он создает TestObserver и подписывается на наш Observable. ** [](/lessons/b/-rxjava-testing) **

4.2. Использование запуска()

Оператор start() является частью модуля RxJava2Extension . Он вызовет указанную функцию асинхронно и вернет Observable , который выдает результат:

Observable<Integer> source = AsyncObservable.start(callable);

for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}

Функция вызывается немедленно, а не всякий раз, когда подписчик подписывается на полученный Observable . Несколько подписок на этот наблюдаемый наблюдают одно и то же возвращаемое значение.

5. Преобразование асинхронного метода

5.1. Использование fromFuture()

Как мы знаем, наиболее распространенным способом создания асинхронного метода в Java является использование реализации Future . Метод fromFuture принимает Future в качестве аргумента и выдает значение, полученное из метода Future.get() .

Во-первых, давайте сделаем функцию, которую мы создали ранее, асинхронной:

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);

Далее проведем тестирование, преобразовав его:

Observable<Integer> source = Observable.fromFuture(future);

for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();

И еще раз обратите внимание, что каждая подписка наблюдает одно и то же возвращаемое значение.

Теперь метод dispose() в Observable действительно полезен, когда речь идет о предотвращении утечки памяти. Но в этом случае это не отменит future из-за блокирующего характера Future.get() .

Таким образом, мы можем убедиться, что будущее отменено, объединив функцию doOnDispose() исходного наблюдаемого объекта и метод отмены для будущего :

source.doOnDispose(() -> future.cancel(true));

5.2. Использование startFuture()

Как видно из названия, этот оператор немедленно запустит указанное Future и выдаст возвращаемое значение, когда подписчик подпишется на него. В отличие от оператора fromFuture , который кэширует результат для следующего использования, этот оператор будет выполнять асинхронный метод каждый раз, когда он подписывается :

ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));

for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();

5.3. Использование отложенного будущего()

Этот оператор объединяет несколько Observable , возвращаемых из метода Future , и возвращает поток возвращаемых значений, полученных из каждого Observable. Это запустит переданную асинхронную фабричную функцию всякий раз, когда подписывается новый подписчик.

Итак, давайте сначала создадим асинхронную фабричную функцию:

List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), 
counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);

И тогда мы можем сделать быстрый тест:

Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1,2,3);
}
exec.shutdown();

6. Заключение

В этом руководстве мы узнали, как преобразовывать синхронные и асинхронные методы в наблюдаемые объекты RxJava2.

Конечно, примеры, которые мы показали здесь, являются базовыми реализациями. Но мы можем использовать RxJava2 для более сложных приложений, таких как потоковое видео и приложения, в которых нам нужно отправлять большие объемы данных порциями.

Как обычно, все короткие примеры, которые мы здесь обсуждали, можно найти в проекте Github .