Перейти к основному содержимому

Разница между RxJava API и Java 9 Flow API

· 7 мин. чтения

1. Введение

API Java Flow был представлен в Java 9 как реализация спецификации Reactive Stream.

В этом руководстве мы сначала исследуем реактивные потоки. Затем мы узнаем о его связи с RxJava и Flow API.

2. Что такое реактивные потоки?

Реактивный манифест представил Reactive Streams, чтобы указать стандарт для асинхронной обработки потоков с неблокирующим противодавлением.

Объем спецификации Reactive Stream заключается в определении минимального набора интерфейсов для достижения этих целей:

  • org.reactivestreams.Publisher — это поставщик данных, который публикует данные для подписчиков в зависимости от их спроса. ``
  • org.reactivestreams.Subscriber является потребителем данных — он может получать данные после подписки на издателя. ``
  • org.reactivestreams.Subscription создается, когда издатель принимает подписчика. ``
  • org.reactivestreams.Processor является и подписчиком, и издателем — он подписывается на издателя, обрабатывает данные и затем передает обработанные данные подписчику. ``

Flow API исходит из спецификации. Ему предшествует RxJava, но начиная с версии 2.0 RxJava также поддерживает эту спецификацию.

Мы углубимся в оба, но сначала давайте рассмотрим практический вариант использования.

3. Вариант использования

В этом руководстве мы будем использовать видеосервис прямой трансляции в качестве варианта использования.

Видео в прямом эфире, в отличие от потокового видео по запросу, не зависит от потребителя. Поэтому сервер публикует поток в своем собственном темпе, и ответственность за адаптацию лежит на потребителе.

В самом простом виде наша модель состоит из издателя видеопотока и видеоплеера в качестве подписчика.

Давайте реализуем VideoFrame как наш элемент данных:

public class VideoFrame {
private long number;
// additional data fields

// constructor, getters, setters
}

Затем давайте рассмотрим наши реализации Flow API и RxJava одну за другой.

4. Реализация с помощью Flow API

API Flow в JDK 9 соответствуют спецификации Reactive Streams. При использовании Flow API, если приложение изначально запрашивает N элементов, издатель отправляет не более N элементов подписчику.

Все интерфейсы Flow API находятся в интерфейсе java.util.concurrent.Flow . Они семантически эквивалентны своим соответствующим аналогам Reactive Stream.

Давайте реализуем VideoStreamServer в качестве издателя VideoFrame .

public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {

public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 5);
}
}

Мы расширили наш VideoStreamServer из SubmissionPublisher вместо прямой реализации Flow::Publisher. SubmissionPublisher — это реализация JDK Flow::Publisher для асинхронной связи с подписчиками, поэтому он позволяет нашему VideoStreamServer излучать в своем собственном темпе.

Кроме того, это полезно для противодавления и обработки буфера, поскольку при вызове SubmissionPublisher::subscribe создается экземпляр BufferedSubscription , а затем добавляется новая подписка в свою цепочку подписок. BufferedSubscription может буферизовать выданные элементы до SubmissionPublisher#maxBufferCapacity .

Теперь давайте определим VideoPlayer, который использует поток VideoFrame. Следовательно, он должен реализовать Flow::Subscriber .

public class VideoPlayer implements Flow.Subscriber<VideoFrame> {

Flow.Subscription subscription = null;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(VideoFrame item) {
log.info("play #{}" , item.getNumber());
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}" , throwable.getMessage());

}

@Override
public void onComplete() {
log.error("Video has ended");
}
}

VideoPlayer подписывается на VideoStreamServer, затем после успешной подписки вызывается метод VideoPlayer :: onSubscribe , и он запрашивает один кадр. VideoPlayer ::onNext получает кадр и запрашивает новый. Количество запрошенных кадров зависит от варианта использования и реализации подписчика .

Наконец, давайте соберем вещи вместе:

VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());

// submit video frames

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
+ " droped because of backpressure"));
return true;
});
}, 0, 1, TimeUnit.MILLISECONDS);

sleep(1000);

5. Реализация с помощью RxJava

RxJava — это Java-реализация ReactiveX . Проект ReactiveX (или Reactive Extensions) направлен на предоставление концепции реактивного программирования. Это комбинация шаблона Observer , шаблона Iterator и функционального программирования.

Последняя основная версия RxJava — 3.x. RxJava поддерживает Reactive Streams начиная с версии 2.x с его базовым классом Flowable , но это более значительный набор, чем Reactive Streams с несколькими базовыми классами, такими как Flowable , Observable , Single , Completable.

Текучесть в качестве компонента соответствия реактивного потока представляет собой поток от 0 до N элементов с обработкой противодавления. Flowable расширяет Publisher от Reactive Streams. Поэтому многие операторы RxJava принимают Publisher напрямую и допускают прямое взаимодействие с другими реализациями Reactive Streams.

Теперь давайте создадим наш генератор видеопотока, который представляет собой бесконечный ленивый поток:

Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
// sleep for 1ms;
return new VideoFrame(videoFrame.getNumber() + 1);
});

Затем мы определяем экземпляр Flowable для генерации кадров в отдельном потоке:

Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

Важно отметить, что нам достаточно бесконечного потока, но если нам нужен более гибкий способ генерации нашего потока, то Flowable.create — хороший выбор.

Flowable
.create(new FlowableOnSubscribe<VideoFrame>() {
AtomicLong frame = new AtomicLong();
@Override
public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
while (true) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
//sleep for 1 ms to simualte delay
}
}
}, /* Set Backpressure Strategy Here */)

Затем на следующем шаге VideoPlayer подписывается на этот Flowable и наблюдает за элементами в отдельном потоке.

videoFlowable
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});

И, наконец, настроим стратегию противодавления . Если мы хотим остановить видео в случае потери кадров, значит, мы должны использовать BackpressureOverflowStrategy::ERROR при заполнении буфера.

Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});

6. Сравнение RxJava и Flow API

Даже в этих двух простых реализациях мы видим, насколько богат API RxJava, особенно для управления буферами, обработки ошибок и стратегии обратного давления. Это дает нам больше возможностей и меньше строк кода благодаря свободному API. Теперь рассмотрим более сложные случаи.

Предположим, что наш плеер не может отображать видеокадры без кодека. Следовательно, с помощью Flow API нам нужно реализовать процессор для имитации кодека и размещения между сервером и проигрывателем. С RxJava мы можем сделать это с помощью Flowable::flatMap или Flowable ::map .

Или давайте представим, что наш плеер также будет транслировать аудио в прямом эфире, поэтому нам нужно объединить потоки видео и аудио от разных издателей. С RxJava мы можем использовать Flowable::combineLatest , но с Flow API это непростая задача.

Хотя можно написать собственный процессор , который подписывается на оба потока и отправляет объединенные данные в наш видеоплеер. Однако реализация — головная боль.

7. Почему Flow API?

На этом этапе у нас может возникнуть вопрос: какова философия Flow API?

Если мы ищем использование Flow API в JDK, мы можем найти что-то в java.net.http и jdk.internal.net.http.

Кроме того, мы можем найти адаптеры в проекте реактора или пакете реактивного потока. Например, в org.reactivestreams.FlowAdapters есть методы для преобразования интерфейсов Flow API в интерфейсы Reactive Stream и наоборот. Поэтому это способствует взаимодействию между Flow API и библиотеками с поддержкой реактивного потока.

Все эти факты помогают нам понять назначение Flow API: он был создан как группа реактивных интерфейсов спецификаций в JDK без передачи третьим лицам. Кроме того, Java ожидает, что Flow API будет принят в качестве стандартных интерфейсов для реактивной спецификации и будет использоваться в JDK или других библиотеках на основе Java, которые реализуют реактивную спецификацию для промежуточного программного обеспечения и утилит.

8. Выводы

В этом руководстве мы познакомились со спецификацией Reactive Stream, API Flow и RxJava.

Кроме того, мы видели практический пример реализации Flow API и RxJava для потокового видео в реальном времени.

Но все аспекты Flow API и RxJava, такие как Flow::Processor , Flowable::map и Flowable::flatMap или стратегии обратного давления, здесь не рассматриваются.

Как всегда, вы найдете полный код руководства на GitHub .