1. Обзор
В этом руководстве мы узнаем , как преобразовать традиционные синхронные и асинхронные API в Observables
с помощью операторов RxJava2 .
Мы создадим несколько простых функций, которые помогут нам подробно обсудить эти операторы.
2. Зависимости Maven
Во- первых, мы должны добавить RxJava2 и RxJava2Extensions в качестве зависимостей Maven:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.4</version>
</dependency>
3. Операторы
RxJava2 определяет множество операторов для различных вариантов использования реактивного программирования.
Но мы обсудим только несколько операторов, которые обычно используются для преобразования синхронных или асинхронных методов в Observable
в зависимости от их природы. Эти операторы принимают функции в качестве аргументов и выдают значение, возвращаемое этой функцией .
Наряду с обычными операторами RxJava2 определяет еще несколько операторов для расширенных функций.
Давайте рассмотрим, как мы можем использовать эти операторы для преобразования синхронных и асинхронных методов.
4. Синхронное преобразование метода
4.1. Использование fromCallable()
Этот оператор возвращает Observable
, который, когда подписчик подписывается на него, вызывает функцию, переданную в качестве аргумента, а затем выдает значение, возвращаемое этой функцией. Давайте создадим функцию, которая возвращает целое число, и преобразуем ее:
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
Теперь давайте превратим его в Observable
и протестируем, подписавшись на него:
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
Оператор fromCallable()
лениво выполняет указанную функцию каждый раз, когда обернутый Observable
подписывается. Чтобы проверить это поведение, мы создали несколько подписчиков с помощью цикла.
Поскольку реактивные потоки по умолчанию асинхронны, подписчик немедленно вернется. В большинстве практических сценариев вызываемая функция будет иметь некоторую задержку для завершения своего выполнения. Итак, мы добавили максимальное время ожидания в пять секунд перед проверкой результата нашей вызываемой функции.
Также обратите внимание, что мы использовали метод test()
Observable
. Этот метод удобен при тестировании Observables
. Он создает TestObserver
и подписывается на наш Observable.
** [
](/lessons/b/-rxjava-testing) **
4.2. Использование запуска()
Оператор start()
является частью модуля RxJava2Extension
. Он вызовет указанную функцию асинхронно и вернет Observable
, который выдает результат:
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
Функция вызывается немедленно, а не всякий раз, когда подписчик подписывается на полученный Observable
. Несколько подписок на этот наблюдаемый наблюдают одно и то же возвращаемое значение.
5. Преобразование асинхронного метода
5.1. Использование fromFuture()
Как мы знаем, наиболее распространенным способом создания асинхронного метода в Java является использование реализации Future
. Метод fromFuture
принимает Future
в качестве аргумента и выдает значение, полученное из метода Future.get()
.
Во-первых, давайте сделаем функцию, которую мы создали ранее, асинхронной:
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Далее проведем тестирование, преобразовав его:
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
И еще раз обратите внимание, что каждая подписка наблюдает одно и то же возвращаемое значение.
Теперь метод dispose()
в Observable
действительно полезен, когда речь идет о предотвращении утечки памяти. Но в этом случае это не отменит future из-за блокирующего характера Future.get()
.
Таким образом, мы можем убедиться, что будущее отменено, объединив функцию doOnDispose()
исходного
наблюдаемого объекта и метод отмены для
будущего
:
source.doOnDispose(() -> future.cancel(true));
5.2. Использование startFuture()
Как видно из названия, этот оператор немедленно запустит указанное Future
и выдаст возвращаемое значение, когда подписчик подпишется на него. В отличие от оператора fromFuture
, который кэширует результат для следующего использования, этот оператор будет выполнять асинхронный метод каждый раз, когда он подписывается :
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
5.3. Использование отложенного будущего()
Этот оператор объединяет несколько Observable
, возвращаемых из метода Future
, и возвращает поток возвращаемых значений, полученных из каждого Observable.
Это запустит переданную асинхронную фабричную функцию всякий раз, когда подписывается новый подписчик.
Итак, давайте сначала создадим асинхронную фабричную функцию:
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(),
counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
И тогда мы можем сделать быстрый тест:
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1,2,3);
}
exec.shutdown();
6. Заключение
В этом руководстве мы узнали, как преобразовывать синхронные и асинхронные методы в наблюдаемые объекты RxJava2.
Конечно, примеры, которые мы показали здесь, являются базовыми реализациями. Но мы можем использовать RxJava2 для более сложных приложений, таких как потоковое видео и приложения, в которых нам нужно отправлять большие объемы данных порциями.
Как обычно, все короткие примеры, которые мы здесь обсуждали, можно найти в проекте Github .