1. Обзор
В этой статье мы рассмотрим реактивные потоки Java 9. Проще говоря, мы сможем использовать класс Flow
, который содержит основные строительные блоки для построения логики обработки реактивного потока.
Reactive Streams
— это стандарт для асинхронной обработки потоков с неблокирующим обратным давлением. Эта спецификация определена в Reactive Manifesto ,
и существуют различные ее реализации, например, RxJava
или Akka-Streams.
2. Обзор реактивного API
Чтобы построить Flow
, мы можем использовать три основные абстракции и объединить их в логику асинхронной обработки.
Каждый поток
должен обрабатывать события, которые публикуются в нем экземпляром Publisher ; У издателя
есть один метод — subscribe ().
Если кто-то из подписчиков хочет получать публикуемые им события, ему необходимо подписаться на данного издателя.
Получатель сообщений должен реализовать интерфейс подписчика .
Обычно это конец каждой обработки потока
, потому что его экземпляр не отправляет сообщения дальше.
Мы можем думать о подписчике
как о приемнике.
У него есть четыре метода, которые необходимо переопределить: onSubscribe(), onNext(), onError()
и onComplete().
Мы рассмотрим их в следующем разделе.
Если мы хотим преобразовать входящее сообщение и передать его дальше следующему подписчику,
нам нужно реализовать интерфейс Processor .
Он действует как подписчик
, поскольку получает сообщения, и как издатель
, поскольку обрабатывает эти сообщения и отправляет их для дальнейшей обработки.
3. Публикация и использование сообщений
Допустим, мы хотим создать простой поток,
в котором у нас есть издатель
, публикующий сообщения, и простой подписчик
, потребляющий сообщения по мере их поступления — по одному за раз.
Давайте создадим класс EndSubscriber
. Нам нужно реализовать интерфейс подписчика .
Далее мы переопределим необходимые методы.
Метод onSubscribe()
вызывается перед началом обработки. Экземпляр подписки
передается в качестве аргумента. Это класс, который используется для управления потоком сообщений между подписчиком
и издателем:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
Мы также инициализировали пустой список
используемых
элементов , которые будут использоваться в тестах.
Теперь нам нужно реализовать оставшиеся методы из интерфейса подписчика .
Основным методом здесь является onNext() — он вызывается всякий раз, когда издатель
публикует новое сообщение:
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
consumedElements.add(item);
subscription.request(1);
}
Обратите внимание, что когда мы начали подписку в методе onSubscribe()
и когда мы обработали сообщение, нам нужно вызвать метод request()
для подписки
, чтобы сигнализировать о том, что текущий подписчик
готов потреблять больше сообщений.
Наконец, нам нужно реализовать onError()
, которая вызывается всякий раз, когда в процессе обработки будет выдано какое-то исключение, а также onComplete()
, вызываемую при закрытии Publisher
:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
Давайте напишем тест для Processing Flow.
Мы будем использовать класс SubmissionPublisher
— конструкцию из java.util.concurrent
— которая реализует интерфейс Publisher .
Мы собираемся отправить N
элементов издателю
, которые получит наш EndSubscriber
:
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
Обратите внимание, что мы вызываем метод close()
для экземпляра EndSubscriber.
Он будет вызывать обратный вызов onComplete()
для каждого подписчика
данного издателя.
Запуск этой программы приведет к следующему выводу:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4. Преобразование сообщений
Допустим, мы хотим построить похожую логику между Publisher
и Subscriber
, но также применить некоторое преобразование.
Мы создадим класс TransformProcessor
, который реализует Processor
и расширяет SubmissionPublisher
, поскольку он будет и издателем
, и подписчиком
.
Мы передадим функцию
, которая будет преобразовывать входные данные в выходные данные:
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
Давайте теперь напишем быстрый тест с потоком обработки, в котором издатель
публикует элементы String .
Наш TransformProcessor
будет анализировать строку
как целое число
, что означает, что здесь должно произойти преобразование:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
Обратите внимание, что вызов метода close()
в базовом издателе
приведет к вызову метода onComplete()
в TransformProcessor
.
Имейте в виду, что все издатели в цепочке обработки должны быть закрыты таким образом.
5. Контроль спроса на сообщения с помощью подписки
Допустим, мы хотим использовать только первый элемент из подписки, применить некоторую логику и закончить обработку. Для этого мы можем использовать метод request()
.
Давайте изменим наш EndSubscriber
, чтобы он потреблял только N сообщений. Мы будем передавать это число в качестве аргумента конструктора howMuchMessagesConsume
:
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...
}
Мы можем запрашивать элементы столько, сколько захотим.
Давайте напишем тест, в котором мы хотим использовать только один элемент из данной подписки:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
Хотя издатель
публикует шесть элементов, наш EndSubscriber
будет потреблять только один элемент, поскольку он сигнализирует о необходимости обработки только этого единственного элемента.
Используя метод request()
в подписке,
мы можем реализовать более сложный механизм обратного давления для управления скоростью потребления сообщений.
6. Заключение
В этой статье мы рассмотрели реактивные потоки Java 9.
Мы увидели, как создать поток
обработки, состоящий из издателя
и подписчика.
Мы создали более сложный поток обработки с преобразованием элементов с помощью Processors
.
Наконец, мы использовали подписку
для управления спросом на элементы со стороны подписчика.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.