Перейти к основному содержимому

Реактивные потоки Java 9

· 6 мин. чтения

1. Обзор

В этой статье мы рассмотрим реактивные потоки Java 9. Проще говоря, мы сможем использовать класс Flow , который содержит основные строительные блоки для построения логики обработки реактивного потока.

Reactive Streams — это стандарт для асинхронной обработки потоков с неблокирующим обратным давлением. Эта спецификация определена в Reactive Manifesto , и существуют различные ее реализации, например, RxJava или Akka-Streams.

2. Обзор реактивного API

Чтобы построить Flow , мы можем использовать три основные абстракции и объединить их в логику асинхронной обработки.

Каждый поток должен обрабатывать события, которые публикуются в нем экземпляром Publisher ; У издателя есть один метод — subscribe ().

Если кто-то из подписчиков хочет получать публикуемые им события, ему необходимо подписаться на данного издателя.

Получатель сообщений должен реализовать интерфейс подписчика . Обычно это конец каждой обработки потока , потому что его экземпляр не отправляет сообщения дальше.

Мы можем думать о подписчике как о приемнике. У него есть четыре метода, которые необходимо переопределить: onSubscribe(), onNext(), onError() и onComplete(). Мы рассмотрим их в следующем разделе.

Если мы хотим преобразовать входящее сообщение и передать его дальше следующему подписчику, нам нужно реализовать интерфейс Processor . Он действует как подписчик , поскольку получает сообщения, и как издатель , поскольку обрабатывает эти сообщения и отправляет их для дальнейшей обработки.

3. Публикация и использование сообщений

Допустим, мы хотим создать простой поток, в котором у нас есть издатель , публикующий сообщения, и простой подписчик , потребляющий сообщения по мере их поступления — по одному за раз.

Давайте создадим класс EndSubscriber . Нам нужно реализовать интерфейс подписчика . Далее мы переопределим необходимые методы.

Метод onSubscribe() вызывается перед началом обработки. Экземпляр подписки передается в качестве аргумента. Это класс, который используется для управления потоком сообщений между подписчиком и издателем:

public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}

Мы также инициализировали пустой список используемых элементов , которые будут использоваться в тестах.

Теперь нам нужно реализовать оставшиеся методы из интерфейса подписчика . Основным методом здесь является onNext() — он вызывается всякий раз, когда издатель публикует новое сообщение:

@Override
public void onNext(T item) {
System.out.println("Got : " + item);
consumedElements.add(item);
subscription.request(1);
}

Обратите внимание, что когда мы начали подписку в методе onSubscribe() и когда мы обработали сообщение, нам нужно вызвать метод request() для подписки , чтобы сигнализировать о том, что текущий подписчик готов потреблять больше сообщений.

Наконец, нам нужно реализовать onError() , которая вызывается всякий раз, когда в процессе обработки будет выдано какое-то исключение, а также onComplete() , вызываемую при закрытии Publisher :

@Override
public void onError(Throwable t) {
t.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("Done");
}

Давайте напишем тест для Processing Flow. Мы будем использовать класс SubmissionPublisher — конструкцию из java.util.concurrent — которая реализует интерфейс Publisher .

Мы собираемся отправить N элементов издателю , которые получит наш EndSubscriber :

@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {

// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");

// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();

// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}

Обратите внимание, что мы вызываем метод close() для экземпляра EndSubscriber. Он будет вызывать обратный вызов onComplete() для каждого подписчика данного издателя.

Запуск этой программы приведет к следующему выводу:

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4. Преобразование сообщений

Допустим, мы хотим построить похожую логику между Publisher и Subscriber , но также применить некоторое преобразование.

Мы создадим класс TransformProcessor , который реализует Processor и расширяет SubmissionPublisher , поскольку он будет и издателем , и подписчиком .

Мы передадим функцию , которая будет преобразовывать входные данные в выходные данные:

public class TransformProcessor<T, R> 
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {

private Function<T, R> function;
private Flow.Subscription subscription;

public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
}

@Override
public void onComplete() {
close();
}
}

Давайте теперь напишем быстрый тест с потоком обработки, в котором издатель публикует элементы String .

Наш TransformProcessor будет анализировать строку как целое число , что означает, что здесь должно произойти преобразование:

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {

// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);

// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();

// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}

Обратите внимание, что вызов метода close() в базовом издателе приведет к вызову метода onComplete() в TransformProcessor .

Имейте в виду, что все издатели в цепочке обработки должны быть закрыты таким образом.

5. Контроль спроса на сообщения с помощью подписки

Допустим, мы хотим использовать только первый элемент из подписки, применить некоторую логику и закончить обработку. Для этого мы можем использовать метод request() .

Давайте изменим наш EndSubscriber , чтобы он потреблял только N сообщений. Мы будем передавать это число в качестве аргумента конструктора howMuchMessagesConsume :

public class EndSubscriber<T> implements Subscriber<T> {

private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();

public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...

}

Мы можем запрашивать элементы столько, сколько захотим.

Давайте напишем тест, в котором мы хотим использовать только один элемент из данной подписки:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {

// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");

// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();

// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}

Хотя издатель публикует шесть элементов, наш EndSubscriber будет потреблять только один элемент, поскольку он сигнализирует о необходимости обработки только этого единственного элемента.

Используя метод request() в подписке, мы можем реализовать более сложный механизм обратного давления для управления скоростью потребления сообщений.

6. Заключение

В этой статье мы рассмотрели реактивные потоки Java 9.

Мы увидели, как создать поток обработки, состоящий из издателя и подписчика. Мы создали более сложный поток обработки с преобразованием элементов с помощью Processors .

Наконец, мы использовали подписку для управления спросом на элементы со стороны подписчика.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.