1. Введение
API Java Flow был представлен в Java 9 как реализация спецификации Reactive Stream.
В этом руководстве мы сначала исследуем реактивные потоки. Затем мы узнаем о его связи с RxJava и Flow API.
2. Что такое реактивные потоки?
Реактивный манифест представил Reactive Streams, чтобы указать стандарт для асинхронной обработки потоков с неблокирующим противодавлением.
Объем спецификации Reactive Stream заключается в определении минимального набора интерфейсов для достижения этих целей:
org.reactivestreams.Publisher
— это поставщик данных, который публикует данные для подписчиков в зависимости от их спроса. ``org.reactivestreams.Subscriber
является потребителем данных — он может получать данные после подписки на издателя. ``org.reactivestreams.Subscription
создается, когда издатель принимает подписчика. ``org.reactivestreams.Processor
является и подписчиком, и издателем — он подписывается на издателя, обрабатывает данные и затем передает обработанные данные подписчику. ``
Flow API исходит из спецификации. Ему предшествует RxJava, но начиная с версии 2.0 RxJava также поддерживает эту спецификацию.
Мы углубимся в оба, но сначала давайте рассмотрим практический вариант использования.
3. Вариант использования
В этом руководстве мы будем использовать видеосервис прямой трансляции в качестве варианта использования.
Видео в прямом эфире, в отличие от потокового видео по запросу, не зависит от потребителя. Поэтому сервер публикует поток в своем собственном темпе, и ответственность за адаптацию лежит на потребителе.
В самом простом виде наша модель состоит из издателя видеопотока и видеоплеера в качестве подписчика.
Давайте реализуем VideoFrame
как наш элемент данных:
public class VideoFrame {
private long number;
// additional data fields
// constructor, getters, setters
}
Затем давайте рассмотрим наши реализации Flow API и RxJava одну за другой.
4. Реализация с помощью Flow API
API Flow в JDK 9 соответствуют спецификации Reactive Streams. При использовании Flow API, если приложение изначально запрашивает N элементов, издатель отправляет не более N элементов подписчику.
Все интерфейсы Flow API находятся в интерфейсе java.util.concurrent.Flow
. Они семантически эквивалентны своим соответствующим аналогам Reactive Stream.
Давайте реализуем VideoStreamServer
в качестве издателя VideoFrame
.
public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 5);
}
}
Мы расширили наш VideoStreamServer
из SubmissionPublisher
вместо прямой реализации Flow::Publisher.
SubmissionPublisher
— это реализация JDK Flow::Publisher
для асинхронной связи с подписчиками, поэтому он позволяет нашему VideoStreamServer
излучать в своем собственном темпе.
Кроме того, это полезно для противодавления и обработки буфера, поскольку при вызове SubmissionPublisher::subscribe
создается экземпляр BufferedSubscription
, а затем добавляется новая подписка в свою цепочку подписок. BufferedSubscription
может буферизовать выданные элементы до SubmissionPublisher#maxBufferCapacity
.
Теперь давайте определим VideoPlayer,
который использует поток VideoFrame.
Следовательно, он должен реализовать Flow::Subscriber
.
public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
log.info("play #{}" , item.getNumber());
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}" , throwable.getMessage());
}
@Override
public void onComplete() {
log.error("Video has ended");
}
}
VideoPlayer
подписывается на VideoStreamServer,
затем после успешной подписки вызывается метод VideoPlayer
:: onSubscribe
, и он запрашивает один кадр. VideoPlayer
::onNext получает кадр и запрашивает новый. Количество запрошенных кадров зависит от варианта использования и реализации подписчика
.
Наконец, давайте соберем вещи вместе:
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
// submit video frames
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
+ " droped because of backpressure"));
return true;
});
}, 0, 1, TimeUnit.MILLISECONDS);
sleep(1000);
5. Реализация с помощью RxJava
RxJava — это Java-реализация ReactiveX . Проект ReactiveX (или Reactive Extensions) направлен на предоставление концепции реактивного программирования. Это комбинация шаблона Observer , шаблона Iterator и функционального программирования.
Последняя основная версия RxJava — 3.x. RxJava поддерживает Reactive Streams начиная с версии 2.x с его базовым классом Flowable
, но это более значительный набор, чем Reactive Streams с несколькими базовыми классами, такими как Flowable
, Observable
, Single
, Completable.
Текучесть
в качестве компонента соответствия реактивного потока представляет собой поток от 0 до N элементов с обработкой противодавления. Flowable
расширяет Publisher
от Reactive Streams. Поэтому многие операторы RxJava принимают Publisher
напрямую и допускают прямое взаимодействие с другими реализациями Reactive Streams.
Теперь давайте создадим наш генератор видеопотока, который представляет собой бесконечный ленивый поток:
Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
// sleep for 1ms;
return new VideoFrame(videoFrame.getNumber() + 1);
});
Затем мы определяем экземпляр Flowable
для генерации кадров в отдельном потоке:
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
Важно отметить, что нам достаточно бесконечного потока, но если нам нужен более гибкий способ генерации нашего потока, то Flowable.create
— хороший выбор.
Flowable
.create(new FlowableOnSubscribe<VideoFrame>() {
AtomicLong frame = new AtomicLong();
@Override
public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
while (true) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
//sleep for 1 ms to simualte delay
}
}
}, /* Set Backpressure Strategy Here */)
Затем на следующем шаге VideoPlayer подписывается на этот Flowable и наблюдает за элементами в отдельном потоке.
videoFlowable
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
И, наконец, настроим стратегию противодавления . Если мы хотим остановить видео в случае потери кадров, значит, мы должны использовать BackpressureOverflowStrategy::ERROR
при заполнении буфера.
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
6. Сравнение RxJava и Flow API
Даже в этих двух простых реализациях мы видим, насколько богат API RxJava, особенно для управления буферами, обработки ошибок и стратегии обратного давления. Это дает нам больше возможностей и меньше строк кода благодаря свободному API. Теперь рассмотрим более сложные случаи.
Предположим, что наш плеер не может отображать видеокадры без кодека. Следовательно, с помощью Flow API нам нужно реализовать процессор
для имитации кодека и размещения между сервером и проигрывателем. С RxJava мы можем сделать это с помощью Flowable::flatMap
или Flowable ::map
.
Или давайте представим, что наш плеер также будет транслировать аудио в прямом эфире, поэтому нам нужно объединить потоки видео и аудио от разных издателей. С RxJava мы можем использовать Flowable::combineLatest ,
но с Flow API это непростая задача.
Хотя можно написать собственный процессор
, который подписывается на оба потока и отправляет объединенные данные в наш видеоплеер.
Однако реализация — головная боль.
7. Почему Flow API?
На этом этапе у нас может возникнуть вопрос: какова философия Flow API?
Если мы ищем использование Flow API в JDK, мы можем найти что-то в java.net.http
и jdk.internal.net.http.
Кроме того, мы можем найти адаптеры в проекте реактора или пакете реактивного потока. Например, в org.reactivestreams.FlowAdapters
есть методы для преобразования интерфейсов Flow API в интерфейсы Reactive Stream и наоборот. Поэтому это способствует взаимодействию между Flow API и библиотеками с поддержкой реактивного потока.
Все эти факты помогают нам понять назначение Flow API: он был создан как группа реактивных интерфейсов спецификаций в JDK без передачи третьим лицам. Кроме того, Java ожидает, что Flow API будет принят в качестве стандартных интерфейсов для реактивной спецификации и будет использоваться в JDK или других библиотеках на основе Java, которые реализуют реактивную спецификацию для промежуточного программного обеспечения и утилит.
8. Выводы
В этом руководстве мы познакомились со спецификацией Reactive Stream, API Flow и RxJava.
Кроме того, мы видели практический пример реализации Flow API и RxJava для потокового видео в реальном времени.
Но все аспекты Flow API и RxJava, такие как Flow::Processor
, Flowable::map
и Flowable::flatMap
или стратегии обратного давления, здесь не рассматриваются.
Как всегда, вы найдете полный код руководства на GitHub .