Перейти к основному содержимому

Обработка исключений в Project Reactor

· 5 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим несколько способов обработки исключений в Project Reactor . Операторы, представленные в примерах кода, определены как в классах Mono , так и в классах Flux . Однако мы сосредоточимся только на методах класса Flux .

2. Зависимости Maven

Начнем с добавления зависимости ядра Reactor :

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId
<version>3.4.9</version>
</dependency>

3. Генерация исключений непосредственно в операторе конвейера

Самый простой способ обработать исключение — бросить его. Если во время обработки элемента потока происходит что-то ненормальное, мы можем сгенерировать Exception с ключевым словом throw , как если бы это было обычное выполнение метода .

Предположим, нам нужно разобрать поток String на Integer s. Если элемент не является числовой строкой , нам нужно сгенерировать исключение .

Обычно для такого преобразования используется оператор карты :

Function<String, Integer> mapper = input -> {
if (input.matches("\\D")) {
throw new NumberFormatException();
} else {
return Integer.parseInt(input);
}
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

Как мы видим, оператор выдает исключение , если входной элемент недействителен. Когда мы выбрасываем Exception таким образом, Reactor перехватывает его и сигнализирует об ошибке ниже по течению :

StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();

Это решение работает, но не элегантно. Как указано в спецификации Reactive Streams, правило 2.13 , оператор должен возвращаться нормально. Reactor помог нам, преобразовав Exception в сигнал ошибки. Однако мы могли бы добиться большего.

По сути, реактивные потоки полагаются на метод onError для индикации состояния сбоя . В большинстве случаев это условие должно быть вызвано вызовом метода error на Publisher . Использование исключения для этого варианта использования возвращает нас к традиционному программированию.

4. Обработка исключений в дескрипторе оператора

Подобно оператору карты , мы можем использовать оператор дескриптора для обработки элементов в потоке один за другим. Разница в том, что Reactor предоставляет оператору handle выходной приемник , что позволяет нам применять более сложные преобразования.

Давайте обновим наш пример из предыдущего раздела, чтобы использовать оператор дескриптора :

BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
if (input.matches("\\D")) {
sink.error(new NumberFormatException());
} else {
sink.next(Integer.parseInt(input));
}
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

В отличие от оператора карты , оператор дескриптора получает функционального потребителя , вызываемого один раз для каждого элемента . Этот потребитель имеет два параметра: элемент, поступающий из восходящего потока, и SynchronousSink , формирующий выходные данные для отправки в нисходящий поток.

Если входной элемент является числовой строкой , мы вызываем метод next в приемнике, предоставляя ему целое число , преобразованное из ввода. Если это не числовая String , мы укажем ситуацию, вызвав метод error с объектом Exception .

Обратите внимание, что вызов метода error отменит подписку на восходящий поток и вызовет метод onError на нисходящем потоке . Такое сотрудничество error и onError является стандартным способом обработки Exception в реактивных потоках.

Давайте проверим выходной поток:

StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();

5. Обработка исключений в операторе flatMap

Другой часто используемый оператор, поддерживающий обработку ошибок, — flatMap . Этот оператор преобразует элементы ввода в Publisher , а затем объединяет Publisher в новый поток. Мы можем воспользоваться этими Publisher для обозначения ошибочного состояния.

Давайте попробуем тот же пример, используя flatMap :

Function<String, Publisher<Integer>> mapper = input -> {
if (input.matches("\\D")) {
return Mono.error(new NumberFormatException());
} else {
return Mono.just(Integer.parseInt(input));
}
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);

StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();

Неудивительно, что результат такой же, как и раньше.

Обратите внимание , что единственная разница между handle и flatMap в отношении обработки ошибок заключается в том, что оператор handle вызывает метод error на приемнике, а flatMap вызывает его на Publisher .

Если мы имеем дело с потоком, представленным объектом Flux , мы также можем использовать concatMap для обработки ошибок . Этот метод во многом похож на flatMap , но не поддерживает асинхронную обработку.

6. Как избежать исключения NullPointerException

В этом разделе рассматривается обработка нулевых ссылок, которые часто вызывают NullPointerException s, часто встречающееся исключение в Java. Чтобы избежать этого исключения, мы обычно сравниваем переменную с нулевым значением и направляем выполнение другим способом, если эта переменная на самом деле равна нулю . Заманчиво сделать то же самое в реактивных потоках:

Function<String, Integer> mapper = input -> {
if (input == null) {
return 0;
} else {
return Integer.parseInt(input);
}
};

Мы можем подумать, что NullPointerException не произойдет, потому что мы уже обработали случай, когда входное значение равно null . Однако реальность говорит о другом:

Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);

StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NullPointerException.class)
.verify();

Судя по всему, NullPointerException спровоцировало ошибку ниже по течению, а это означает, что наша проверка на null не сработала .

Чтобы понять, почему это произошло, нам нужно вернуться к спецификации Reactive Streams. Правило 2.13 спецификации гласит, что «вызовы onSubscribe , onNext , onError или onComplete ДОЛЖНЫ возвращаться нормально, за исключением случаев, когда какой-либо предоставленный параметр имеет значение null , и в этом случае он ДОЛЖЕН вызывать исключение java.lang.NullPointerException для вызывающей стороны».

Как того требует спецификация, Reactor выдает исключение NullPointerException , когда значение null достигает функции карты .

Поэтому мы ничего не можем сделать с нулевым значением, когда оно достигает определенного потока. Мы не можем обработать его или преобразовать в ненулевое значение, прежде чем передать его вниз по течению. Таким образом, единственный способ избежать NullPointerException — убедиться, что нулевые значения не попадут в конвейер .

7. Заключение

В этой статье мы рассмотрели обработку исключений в Project Reactor. Мы обсудили пару примеров и разъяснили процесс. Мы также рассмотрели особый случай исключения, которое может произойти при обработке реактивного потока — NullPointerException .

Как обычно, исходный код нашего приложения доступен на GitHub .