1. Обзор
В этом руководстве мы рассмотрим несколько способов обработки исключений в Project Reactor . Операторы, представленные в примерах кода, определены как в классах Mono
, так и в классах Flux .
Однако мы сосредоточимся только на методах класса Flux
.
2. Зависимости Maven
Начнем с добавления зависимости ядра Reactor :
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId
<version>3.4.9</version>
</dependency>
3. Генерация исключений непосредственно в операторе конвейера
Самый простой способ обработать исключение
— бросить его. Если во время обработки элемента потока происходит что-то ненормальное, мы можем сгенерировать Exception
с ключевым словом throw
, как если бы это было обычное выполнение метода .
Предположим, нам нужно разобрать поток String
на Integer
s. Если элемент не является числовой строкой
, нам нужно сгенерировать исключение
.
Обычно для такого преобразования используется оператор карты :
Function<String, Integer> mapper = input -> {
if (input.matches("\\D")) {
throw new NumberFormatException();
} else {
return Integer.parseInt(input);
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);
Как мы видим, оператор выдает исключение
, если входной элемент недействителен. Когда мы выбрасываем Exception
таким образом, Reactor перехватывает его и сигнализирует об ошибке ниже по течению :
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
Это решение работает, но не элегантно. Как указано в спецификации Reactive Streams, правило 2.13 , оператор должен возвращаться нормально. Reactor помог нам, преобразовав Exception
в сигнал ошибки. Однако мы могли бы добиться большего.
По сути, реактивные потоки полагаются на метод onError
для индикации состояния сбоя . В большинстве случаев это условие должно быть вызвано вызовом метода error
на Publisher
. Использование исключения
для этого варианта использования возвращает нас к традиционному программированию.
4. Обработка исключений в дескрипторе
оператора
Подобно оператору карты
, мы можем использовать оператор дескриптора
для обработки элементов в потоке один за другим. Разница в том, что Reactor предоставляет оператору handle
выходной приемник , что позволяет нам применять более сложные преобразования.
Давайте обновим наш пример из предыдущего раздела, чтобы использовать оператор дескриптора :
BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
if (input.matches("\\D")) {
sink.error(new NumberFormatException());
} else {
sink.next(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);
В отличие от оператора карты
, оператор дескриптора получает функционального потребителя , вызываемого один раз для
каждого элемента . Этот потребитель имеет два параметра: элемент, поступающий из восходящего потока, и SynchronousSink
, формирующий выходные данные для отправки в нисходящий поток.
Если входной элемент является числовой строкой
, мы вызываем метод next
в приемнике, предоставляя ему целое число
, преобразованное из ввода. Если это не числовая String
, мы укажем ситуацию, вызвав метод error
с объектом Exception .
Обратите внимание, что вызов метода error
отменит подписку на восходящий поток и вызовет метод onError
на нисходящем потоке . Такое сотрудничество error
и onError
является стандартным способом обработки Exception
в реактивных потоках.
Давайте проверим выходной поток:
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
5. Обработка исключений в операторе flatMap
Другой часто используемый оператор, поддерживающий обработку ошибок, — flatMap
. Этот оператор преобразует элементы ввода в Publisher
, а затем объединяет Publisher
в новый поток. Мы можем воспользоваться этими Publisher
для обозначения ошибочного состояния.
Давайте попробуем тот же пример, используя flatMap
:
Function<String, Publisher<Integer>> mapper = input -> {
if (input.matches("\\D")) {
return Mono.error(new NumberFormatException());
} else {
return Mono.just(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
Неудивительно, что результат такой же, как и раньше.
Обратите внимание , что единственная разница между handle
и flatMap
в отношении обработки ошибок заключается в том, что оператор handle вызывает метод
error
на приемнике, а flatMap
вызывает его на Publisher
.
Если мы имеем дело с потоком, представленным объектом Flux
, мы также можем использовать concatMap
для обработки ошибок . Этот метод во многом похож на flatMap
, но не поддерживает асинхронную обработку.
6. Как избежать исключения NullPointerException
В этом разделе рассматривается обработка нулевых
ссылок, которые часто вызывают NullPointerException
s, часто встречающееся исключение
в Java. Чтобы избежать этого исключения, мы обычно сравниваем переменную с нулевым
значением и направляем выполнение другим способом, если эта переменная на самом деле равна нулю
. Заманчиво сделать то же самое в реактивных потоках:
Function<String, Integer> mapper = input -> {
if (input == null) {
return 0;
} else {
return Integer.parseInt(input);
}
};
Мы можем подумать, что NullPointerException
не произойдет, потому что мы уже обработали случай, когда входное значение равно null
. Однако реальность говорит о другом:
Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NullPointerException.class)
.verify();
Судя по всему, NullPointerException
спровоцировало ошибку ниже по течению, а это означает, что наша проверка на null
не сработала .
Чтобы понять, почему это произошло, нам нужно вернуться к спецификации Reactive Streams. Правило 2.13 спецификации гласит, что «вызовы onSubscribe
, onNext
, onError
или onComplete
ДОЛЖНЫ возвращаться нормально, за исключением случаев, когда какой-либо предоставленный параметр имеет значение null
, и в этом случае он ДОЛЖЕН вызывать исключение java.lang.NullPointerException
для вызывающей стороны».
Как того требует спецификация, Reactor выдает исключение NullPointerException
, когда значение null
достигает функции карты
.
Поэтому мы ничего не можем сделать с нулевым
значением, когда оно достигает определенного потока. Мы не можем обработать его или преобразовать в ненулевое
значение, прежде чем передать его вниз по течению. Таким образом, единственный способ избежать NullPointerException
— убедиться, что нулевые
значения не попадут в конвейер .
7. Заключение
В этой статье мы рассмотрели обработку исключений
в Project Reactor. Мы обсудили пару примеров и разъяснили процесс. Мы также рассмотрели особый случай исключения, которое может произойти при обработке реактивного потока — NullPointerException
.
Как обычно, исходный код нашего приложения доступен на GitHub .