Перейти к основному содержимому

Введение в ядро реактора

· 11 мин. чтения

1. Введение

Reactor Core — это библиотека Java 8, реализующая модель реактивного программирования. Он построен на основе спецификации Reactive Streams , стандарта для создания реактивных приложений.

На фоне нереактивной Java-разработки переход на реактивную может оказаться довольно сложной кривой обучения. Это становится более сложным при сравнении его с Stream API Java 8, поскольку они могут быть ошибочно приняты за одни и те же высокоуровневые абстракции.

В этой статье мы попытаемся демистифицировать эту парадигму. Мы будем продвигаться по Reactor небольшими шагами, пока не создадим представление о том, как создавать реактивный код, заложив основу для более сложных статей, которые появятся в последующих сериях.

2. Спецификация реактивных потоков

Прежде чем мы рассмотрим Reactor, мы должны взглянуть на спецификацию Reactive Streams. Это то, что реализует Reactor, и это закладывает основу для библиотеки.

По сути, Reactive Streams — это спецификация для асинхронной обработки потоков.

Другими словами, это система, в которой множество событий генерируется и потребляется асинхронно. Подумайте о потоке тысяч обновлений акций в секунду, поступающих в финансовое приложение, и о том, что оно должно своевременно реагировать на эти обновления.

Одной из основных целей этого является решение проблемы обратного давления. Если у нас есть производитель, который отправляет события потребителю быстрее, чем он может их обработать, то в конечном итоге потребитель будет перегружен событиями, и ему не хватит системных ресурсов.

Обратное давление означает, что наш потребитель должен иметь возможность сообщить производителю, сколько данных отправлять, чтобы предотвратить это, и это то, что изложено в спецификации.

3. Зависимости Maven

Прежде чем мы начнем, давайте добавим наши зависимости Maven :

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.16</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>

Мы также добавляем Logback в качестве зависимости. Это потому, что мы будем регистрировать выходные данные Reactor, чтобы лучше понять поток данных.

4. Создание потока данных

Чтобы приложение было реактивным, первое, что оно должно уметь делать, — это создавать поток данных.

Это может быть что-то вроде примера обновления запасов, который мы приводили ранее. Без этих данных нам не на что было бы реагировать, поэтому это логичный первый шаг.

Reactive Core предоставляет нам два типа данных, которые позволяют нам это сделать.

4.1. Флюс

Первый способ сделать это с помощью Flux . Это поток, который может испускать 0..n элементов. Давайте попробуем создать простой:

Flux<Integer> just = Flux.just(1, 2, 3, 4);

В данном случае у нас есть статический поток из четырех элементов.

4.2. Мононуклеоз

Второй способ сделать это — использовать Mono , представляющий собой поток из 0..1 элементов. Давайте попробуем создать экземпляр:

Mono<Integer> just = Mono.just(1);

Это выглядит и ведет себя почти так же, как Flux , только на этот раз мы ограничены не более чем одним элементом.

4.3. Почему не только Flux?

Прежде чем экспериментировать дальше, стоит подчеркнуть, почему у нас есть эти два типа данных.

Во- первых, следует отметить, что и Flux , и Mono являются реализациями интерфейса Reactive Streams Publisher . Оба класса соответствуют спецификации, и вместо них мы могли бы использовать этот интерфейс:

Publisher<String> just = Mono.just("foo");

Но на самом деле знать эту кардинальность полезно. Это связано с тем, что несколько операций имеют смысл только для одного из двух типов и потому, что они могут быть более выразительными (представьте себе findOne() в репозитории).

5. Подписка на поток

Теперь у нас есть общий обзор того, как создать поток данных, нам нужно подписаться на него, чтобы он испускал элементы.

5.1. Сбор элементов

Давайте воспользуемся методом subscribe() для сбора всех элементов в потоке:

List<Integer> elements = new ArrayList<>();

Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);

assertThat(elements).containsExactly(1, 2, 3, 4);

Данные не начнут поступать, пока мы не подпишемся. Обратите внимание, что мы также добавили журналирование, это будет полезно, когда мы посмотрим, что происходит за кулисами.

5.2. Поток элементов

Имея ведение журнала, мы можем использовать его для визуализации того, как данные проходят через наш поток:

20:25:19.550 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

Во-первых, все работает в основном потоке. Не будем вдаваться в подробности по этому поводу, так как мы подробнее рассмотрим параллелизм позже в этой статье. Тем не менее, это делает вещи простыми, поскольку мы можем иметь дело со всем по порядку.

Теперь давайте пройдемся по последовательности, которую мы записали один за другим:

  1. onSubscribe() — вызывается, когда мы подписываемся на наш поток
  2. request(unbounded) — Когда мы вызываем subscribe , за кулисами мы создаем Subscription . Эта подписка запрашивает элементы из потока. В этом случае по умолчанию используется значение unbounded, что означает, что он запрашивает каждый доступный элемент.
  3. onNext() — вызывается для каждого отдельного элемента.
  4. onComplete() — вызывается последним, после получения последнего элемента. На самом деле также есть onError() , который будет вызываться, если есть исключение, но в этом случае нет

Это поток, изложенный в интерфейсе подписчика как часть спецификации реактивных потоков, и на самом деле это то, что было создано за кулисами в нашем вызове onSubscribe(). Это полезный метод, но чтобы лучше понять, что происходит, давайте напрямую предоставим интерфейс подписчика :

Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
elements.add(integer);
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Мы видим, что каждый возможный этап в приведенном выше потоке соответствует методу в реализации подписчика . Так уж получилось, что Flux предоставил нам вспомогательный метод для уменьшения этой многословности.

5.3. Сравнение с потоками Java 8

По-прежнему может показаться, что у нас есть что-то синонимично сбору потока Java 8:

List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());

Только не мы.

Основное отличие состоит в том, что Reactive — это модель push, тогда как потоки Java 8 — модель pull. При реактивном подходе события передаются подписчикам по мере их поступления.

Следующее, на что следует обратить внимание, это оператор терминала Streams — это просто терминал, извлекающий все данные и возвращающий результат. С Reactive мы могли бы иметь бесконечный поток, поступающий из внешнего ресурса, с несколькими подписчиками, подключенными и удаленными на разовой основе. Мы также можем делать такие вещи, как объединение потоков, дросселирование потоков и применение обратного давления, которые мы рассмотрим далее.

6. Противодавление

Следующее, что мы должны рассмотреть, это обратное давление. В нашем примере подписчик говорит производителю отправить каждый элемент сразу. Это может стать непосильным для подписчика, потребляя все его ресурсы.

Обратное давление — это когда нисходящий поток может сказать восходящему потоку отправлять ему меньше данных, чтобы предотвратить его перегрузку .

Мы можем изменить нашу реализацию подписчика , чтобы применить обратное давление. Давайте скажем восходящему потоку отправлять только два элемента за раз, используя request() :

Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;

@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}

@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Теперь, если мы снова запустим наш код, мы увидим, что вызывается запрос (2) , за которым следуют два вызова onNext() , а затем снова запрос (2) .

23:31:15.395 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

По сути, это реактивное тяговое противодавление. Мы просим апстрим отправлять только определенное количество элементов и только тогда, когда мы готовы.

Если мы представим, что нам транслируют твиты из Твиттера, то решение о том, что делать, будет зависеть от восходящего потока. Если бы твиты поступали, но запросов от нисходящего потока не было, то вышестоящий мог бы отбрасывать элементы, сохранять их в буфере или применять другую стратегию.

7. Работа с потоком

Мы также можем выполнять операции с данными в нашем потоке, реагируя на события по своему усмотрению.

7.1. Отображение данных в потоке

Простая операция, которую мы можем выполнить, — это применение преобразования. В этом случае давайте просто удвоим все числа в нашем потоке:

Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);

map() будет применяться при вызове onNext() .

7.2. Объединение двух потоков

Затем мы можем сделать вещи более интересными, объединив другой поток с этим. Давайте попробуем это с помощью функции zip() :

Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);

assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");

Здесь мы создаем еще один Flux , который продолжает увеличиваться на единицу и транслирует его вместе с нашим исходным. Мы можем увидеть, как они работают вместе, изучив журналы:

20:04:38.064 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Обратите внимание, что теперь у нас есть одна подписка на каждый Flux . Вызовы onNext() также чередуются, поэтому индекс каждого элемента в потоке будет совпадать, когда мы применяем функцию zip() .

8. Горячие потоки

В настоящее время мы сосредоточились в основном на холодных потоках. Это статические потоки фиксированной длины, с которыми легко работать. Более реалистичным вариантом использования реактивного может быть что-то, что происходит бесконечно.

Например, у нас может быть поток движений мыши, на который нужно постоянно реагировать, или лента Twitter. Эти типы потоков называются горячими потоками, так как они всегда выполняются и на них можно подписаться в любой момент времени, пропуская начало данных.

8.1. Создание ConnectableFlux

Одним из способов создания горячего потока является преобразование холодного потока в один. Давайте создадим Flux , который длится вечно, выводя результаты на консоль, что бы имитировало бесконечный поток данных, поступающих с внешнего ресурса:

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();

При вызове publish() мы получаем ConnectableFlux . Это означает, что вызов subscribe() не приведет к тому, что он начнет излучать, что позволит нам добавить несколько подписок:

publish.subscribe(System.out::println);        
publish.subscribe(System.out::println);

Если мы попытаемся запустить этот код, ничего не произойдет. Пока мы не вызовем connect(), Flux не начнет излучать:

publish.connect();

8.2. Дросселирование

Если мы запустим наш код, наша консоль будет перегружена журналированием. Это моделирует ситуацию, когда нашим потребителям передается слишком много данных. Давайте попробуем обойти это с помощью дросселирования:

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();

Здесь мы ввели метод sample() с интервалом в две секунды. Теперь значения будут передаваться нашему подписчику только каждые две секунды, а это означает, что консоль будет намного менее беспокойной.

Конечно, существует несколько стратегий уменьшения количества данных, отправляемых вниз по течению, таких как управление окнами и буферизация, но они не будут рассматриваться в этой статье.

9. Параллелизм

Все приведенные выше примеры в настоящее время выполняются в основном потоке. Однако мы можем контролировать, в каком потоке выполняется наш код, если захотим. Интерфейс Scheduler обеспечивает абстракцию вокруг асинхронного кода, для которого нам предоставлено множество реализаций. Давайте попробуем подписаться на другой поток, отличный от основного:

Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);

Планировщик Parallel заставит нашу подписку выполняться в другом потоке, в чем мы можем убедиться, просмотрев журналы. Мы видим, что первая запись поступает из основного потока, а Flux работает в другом потоке с именем parallel-1 .

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Параллелизм становится более интересным, и нам стоит изучить его в другой статье.

10. Заключение

В этой статье мы дали общий сквозной обзор Reactive Core. Мы объяснили, как мы можем публиковать потоки и подписываться на них, применять обратное давление, работать с потоками, а также асинхронно обрабатывать данные. Мы надеемся, что это должно заложить основу для написания реактивных приложений.

В последующих статьях этой серии будут рассмотрены более сложные параллелизм и другие концепции реагирования. Также есть еще одна статья, посвященная Reactor с Spring .

Исходный код нашего приложения доступен на GitHub ; это проект Maven, который должен работать как есть.