Перейти к основному содержимому

Отладка реактивных потоков в Java

· 7 мин. чтения

1. Обзор

Отладка реактивных потоков , вероятно, является одной из основных проблем, с которыми нам придется столкнуться, когда мы начнем использовать эти структуры данных.

И, учитывая, что Reactive Streams набирает популярность в последние годы, полезно знать, как мы можем эффективно выполнить эту задачу.

Давайте начнем с настройки проекта с использованием реактивного стека, чтобы понять, почему это часто вызывает проблемы.

2. Сценарий с ошибками

Мы хотим смоделировать реальный сценарий, в котором запущено несколько асинхронных процессов и где мы внесли в код некоторые дефекты, которые в конечном итоге вызовут исключения.

Чтобы понять общую картину, упомянем, что наше приложение будет потреблять и обрабатывать потоки простых объектов Foo , которые содержат только id , formattedName и поле количества . Для более подробной информации смотрите проект здесь .

2.1. Анализ вывода журнала

Теперь давайте рассмотрим фрагмент и вывод, который он генерирует при обнаружении необработанной ошибки:

public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.map(FooReporter::reportResult)
.subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
flux.map(FooNameHelper::substringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.subscribe();
}

Запустив наше приложение на несколько секунд, мы поймем, что время от времени оно регистрирует исключения.

Присмотревшись к одной из ошибок, мы найдем что-то похожее на это:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at com.foreach.debugging.consumer.service.FooNameHelper
.lambda$1(FooNameHelper.java:38)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
at j.u.c.FutureTask.run(FutureTask.java:266)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293)
at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at j.l.Thread.run(Thread.java:748)

Основываясь на основной причине и заметив класс FooNameHelper , упомянутый в трассировке стека, мы можем предположить, что в некоторых случаях наши объекты Foo обрабатываются со значением formattedName , которое короче, чем ожидалось.

Конечно, это всего лишь упрощенный случай, и решение кажется довольно очевидным.

Но давайте представим, что это реальный сценарий, когда само исключение не помогает нам решить проблему без какой-либо контекстной информации.

Исключение было вызвано как часть метода processFoo или метода processFooInAnotherScenario ?

Повлияли ли другие предыдущие шаги на поле formattedName до перехода на этот этап?

Запись в журнале не поможет нам разобраться в этих вопросах.

Что еще хуже, иногда исключение даже не вызывается из нашей функциональности.

Например, представьте, что мы полагаемся на реактивный репозиторий для сохранения наших объектов Foo . Если в этот момент возникает ошибка, мы можем даже не знать, с чего начать отладку нашего кода.

Нам нужны инструменты для эффективной отладки реактивных потоков.

3. Использование сеанса отладки

Одним из способов выяснить, что происходит с нашим приложением, является запуск сеанса отладки с использованием нашей любимой IDE.

Нам нужно будет установить пару условных точек останова и проанализировать поток данных при выполнении каждого шага в потоке.

Действительно, это может быть трудоемкой задачей, особенно если у нас запущено много реактивных процессов и совместно используются ресурсы.

Кроме того, во многих случаях мы не можем начать сеанс отладки по соображениям безопасности.

4. Регистрация информации с помощью метода doOnErrorMethod или с использованием параметра Subscribe

Иногда мы можем добавить полезную контекстную информацию, указав Consumer в качестве второго параметра метода subscribe :

public void processFoo(Flux<Foo> flux) {

// ...

flux.subscribe(foo -> {
logger.debug("Finished processing Foo with Id {}", foo.getId());
}, error -> {
logger.error(
"The following error happened on processFoo method!",
error);
});
}

Примечание. Стоит отметить, что если нам не нужно выполнять дальнейшую обработку метода подписки , мы можем связать функцию doOnError с нашим издателем:

flux.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
}).subscribe();

Теперь у нас будет некоторое руководство о том, откуда может исходить ошибка, хотя у нас все еще мало информации о фактическом элементе, сгенерировавшем исключение.

5. Активация конфигурации глобальной отладки Reactor

Библиотека Reactor предоставляет класс Hooks , который позволяет нам настраивать поведение операторов Flux и Mono .

Просто добавив следующий оператор, наше приложение будет обрабатывать вызовы методов издателя, обертывать конструкцию оператора и захватывать трассировку стека :

Hooks.onOperatorDebug();

После активации режима отладки наши журналы исключений будут содержать некоторую полезную информацию:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
...
at j.l.Thread.run(Thread.java:748)
Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
reactor.core.publisher.Flux.map(Flux.java:5653)
c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
c.d.b.c.s.FooService.processFoo(FooService.java:24)
c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable
.run(DelegatingErrorHandlingRunnable.java:54)
o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.map ⇢ c.d.b.c.s.FooNameHelper
.substringFooName(FooNameHelper.java:32)
|_ Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

Как мы видим, первый раздел остается относительно таким же, но следующие разделы содержат информацию о:

  1. Трассировка сборки издателя — здесь мы можем подтвердить, что ошибка сначала была сгенерирована в методе processFoo .
  2. Операторы, обнаружившие ошибку после ее первого запуска, с классом пользователя, к которому они были привязаны.

Примечание. В этом примере, главным образом для того, чтобы это было ясно, мы добавляем операции для разных классов.

Мы можем включить или выключить режим отладки в любое время, но это не повлияет на уже созданные объекты Flux и Mono .

5.1. Выполнение операторов в разных потоках

Еще один аспект, о котором следует помнить, заключается в том, что трассировка сборки создается правильно, даже если в потоке работают разные потоки.

Давайте посмотрим на следующий пример:

public void processFoo(Flux<Foo> flux) {
flux.publishOn(Schedulers.newSingle("foo-thread"))
// ...
.publishOn(Schedulers.newSingle("bar-thread"))
.map(FooReporter::reportResult)
.subscribeOn(Schedulers.newSingle("starter-thread"))
.subscribe();
}

Теперь, если мы проверим журналы, мы поймем, что в этом случае первый раздел может немного измениться, но последние два остаются практически такими же.

Первая часть — это трассировка стека потока, поэтому она покажет только операции, выполняемые конкретным потоком.

Как мы видели, это не самый важный раздел при отладке приложения, поэтому это изменение допустимо.

6. Активация вывода отладки для одного процесса

Инструментирование и создание трассировки стека в каждом реактивном процессе требует больших затрат.

Таким образом, мы должны применять первый подход только в критических случаях .

В любом случае, Reactor предоставляет способ включить режим отладки для отдельных важных процессов, который потребляет меньше памяти .

Имеем в виду оператора КПП :

public void processFoo(Flux<Foo> flux) {

// ...

flux.checkpoint("Observed error on processFoo", true)
.subscribe();
}

Обратите внимание, что таким образом трассировка сборки будет регистрироваться на этапе контрольной точки:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
...
Assembly trace from producer [reactor.core.publisher.FluxMap],
described as [Observed error on processFoo] :
r.c.p.Flux.checkpoint(Flux.java:3096)
c.b.d.c.s.FooService.processFoo(FooService.java:26)
c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

Мы должны реализовать метод контрольной точки ближе к концу реактивной цепочки.

В противном случае оператор не сможет наблюдать за ошибками, происходящими ниже по течению.

Также отметим, что библиотека предлагает перегруженный метод. Мы можем избежать:

  • указание описания наблюдаемой ошибки, если мы используем опцию no-args
  • генерация заполненной трассировки стека (что является наиболее дорогостоящей операцией), предоставляя только пользовательское описание

7. Регистрация последовательности элементов

Наконец, издатели Reactor предлагают еще один метод, который потенциально может пригодиться в некоторых случаях.

Вызывая метод log в нашей реактивной цепочке, приложение будет логировать каждый элемент в потоке с тем состоянием, которое у него есть на данном этапе .

Попробуем на нашем примере:

public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.log();
.map(FooReporter::reportResult)
.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
})
.subscribe();
}

И проверьте логи:

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO reactor.Flux.OnAssembly.1 - request(unbounded)
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
...

Мы можем легко увидеть состояние каждого объекта Foo на этом этапе и то, как фреймворк отменяет поток при возникновении исключения.

Конечно, такой подход тоже затратный, и нам придется использовать его в меру.

8. Заключение

Мы можем потратить много времени и усилий на устранение неполадок, если не знаем инструментов и механизмов для правильной отладки нашего приложения.

Это особенно верно, если мы не привыкли работать с реактивными и асинхронными структурами данных, и нам нужна дополнительная помощь, чтобы понять, как все работает.

Как всегда, полный пример доступен в репозитории GitHub .