1. Обзор
С введением Spring WebFlux мы получили еще один мощный инструмент для написания реактивных неблокирующих приложений. Хотя использовать эту технологию теперь намного проще, чем раньше, отладка реактивных последовательностей в Spring WebFlux может быть довольно громоздкой .
В этом кратком руководстве мы увидим, как легко регистрировать события в асинхронных последовательностях и как избежать некоторых простых ошибок.
2. Зависимость от Maven
Давайте добавим в наш проект зависимость Spring WebFlux, чтобы мы могли создавать реактивные потоки:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Мы можем получить последнюю зависимость spring-boot-starter-webflux
от Maven Central.
3. Создание реактивного потока
Для начала давайте создадим реактивный поток с помощью Flux
и используем метод log()
для включения ведения журнала:
Flux<Integer> reactiveStream = Flux.range(1, 5).log();
Далее мы подпишемся на него, чтобы использовать сгенерированные значения:
reactiveStream.subscribe();
4. Регистрация реактивного потока
После запуска вышеупомянутого приложения мы видим наш регистратор в действии:
2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()
Мы видим каждое событие, которое произошло на нашем потоке. Было выдано пять значений, а затем поток закрылся с помощью события onComplete() .
5. Сценарий расширенного ведения журнала
Мы можем изменить наше приложение, чтобы увидеть более интересный сценарий. Давайте добавим take()
во Flux
, который будет указывать потоку предоставлять только определенное количество событий:
Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);
После выполнения кода мы увидим следующий вывод:
2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()
Как мы видим, функция take()
вызвала отмену потока после отправки трех событий.
Размещение log()
в вашем потоке имеет решающее значение . Давайте посмотрим, как размещение log()
после take()
приведет к другому результату:
Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();
И вывод:
2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()
Как мы видим, изменение точки наблюдения изменило вывод. Теперь поток выдал три события, но вместо cancel()
мы видим onComplete()
. Это потому, что мы наблюдаем вывод использования take()
вместо того, что было запрошено этим методом.
6. Заключение
В этой быстрой статье мы увидели, как регистрировать реактивные потоки с помощью встроенного метода log()
.
И, как всегда, исходный код приведенного выше примера можно найти на GitHub .