Перейти к основному содержимому

Регистрация реактивной последовательности

· 3 мин. чтения

1. Обзор

С введением Spring WebFlux мы получили еще один мощный инструмент для написания реактивных неблокирующих приложений. Хотя использовать эту технологию теперь намного проще, чем раньше, отладка реактивных последовательностей в Spring WebFlux может быть довольно громоздкой .

В этом кратком руководстве мы увидим, как легко регистрировать события в асинхронных последовательностях и как избежать некоторых простых ошибок.

2. Зависимость от Maven

Давайте добавим в наш проект зависимость Spring WebFlux, чтобы мы могли создавать реактивные потоки:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Мы можем получить последнюю зависимость spring-boot-starter-webflux от Maven Central.

3. Создание реактивного потока

Для начала давайте создадим реактивный поток с помощью Flux и используем метод log() для включения ведения журнала:

Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Далее мы подпишемся на него, чтобы использовать сгенерированные значения:

reactiveStream.subscribe();

4. Регистрация реактивного потока

После запуска вышеупомянутого приложения мы видим наш регистратор в действии:

2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

Мы видим каждое событие, которое произошло на нашем потоке. Было выдано пять значений, а затем поток закрылся с помощью события onComplete() .

5. Сценарий расширенного ведения журнала

Мы можем изменить наше приложение, чтобы увидеть более интересный сценарий. Давайте добавим take() во Flux , который будет указывать потоку предоставлять только определенное количество событий:

Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

После выполнения кода мы увидим следующий вывод:

2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

Как мы видим, функция take() вызвала отмену потока после отправки трех событий.

Размещение log() в вашем потоке имеет решающее значение . Давайте посмотрим, как размещение log() после take() приведет к другому результату:

Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

И вывод:

2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

Как мы видим, изменение точки наблюдения изменило вывод. Теперь поток выдал три события, но вместо cancel() мы видим onComplete() . Это потому, что мы наблюдаем вывод использования take() вместо того, что было запрошено этим методом.

6. Заключение

В этой быстрой статье мы увидели, как регистрировать реактивные потоки с помощью встроенного метода log() .

И, как всегда, исходный код приведенного выше примера можно найти на GitHub .