Перейти к основному содержимому

Обработка ошибок с помощью Spring AMQP

· 11 мин. чтения

1. Введение

Асинхронный обмен сообщениями — это тип слабосвязанной распределенной связи, который становится все более популярным для реализации архитектур, управляемых событиями . К счастью, Spring Framework предоставляет проект Spring AMQP , позволяющий нам создавать решения для обмена сообщениями на основе AMQP.

С другой стороны, работа с ошибками в таких средах может оказаться нетривиальной задачей . Итак, в этом уроке мы рассмотрим различные стратегии обработки ошибок.

2. Настройка среды

В этом руководстве мы будем использовать RabbitMQ , реализующий стандарт AMQP. Кроме того, Spring AMQP предоставляет модуль spring-rabbit , который делает интеграцию действительно простой.

Давайте запустим RabbitMQ как отдельный сервер. Мы запустим его в контейнере Docker , выполнив следующую команду:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Подробную информацию о конфигурации и настройке зависимостей проекта см. в нашей статье Spring AMQP .

3. Сценарий отказа

Обычно в системах обмена сообщениями может возникать больше типов ошибок по сравнению с монолитными или однопакетными приложениями из-за их распределенного характера.

Можно выделить некоторые типы исключений:

  • Связанные с сетью или вводом -выводом — общие сбои сетевых подключений и операций ввода-вывода.
  • Связанные с протоколом или инфраструктурой — ошибки, которые обычно представляют собой неправильную настройку инфраструктуры обмена сообщениями.
  • Связанные с брокером — сбои, которые предупреждают о неправильной конфигурации между клиентами и брокером AMQP. Например, достижение определенных пределов или порога, проверка подлинности или неверная конфигурация политик.
  • Связанные с приложениями и сообщениями — исключения, которые обычно указывают на нарушение некоторых бизнес-правил или правил приложений.

Конечно, этот список сбоев не является исчерпывающим, но содержит наиболее распространенные типы ошибок.

Следует отметить, что Spring AMQP обрабатывает связанные с подключением и низкоуровневые проблемы из коробки, например, применяя политики повторных попыток или повторной очереди . Кроме того, большинство отказов и ошибок преобразуются в AmqpException или один из его подклассов.

В следующих разделах мы в основном сосредоточимся на ошибках, характерных для приложений, и ошибках высокого уровня, а затем рассмотрим глобальные стратегии обработки ошибок.

4. Настройка проекта

Теперь давайте определим простую очередь и конфигурацию обмена для начала:

public static final String QUEUE_MESSAGES = "foreach-messages-queue";
public static final String EXCHANGE_MESSAGES = "foreach-messages-exchange";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
}

@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}

@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

Далее создадим простого производителя:

public void sendMessage() {
rabbitTemplate
.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

И, наконец, потребитель, выбрасывающий исключение:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
throw new BusinessException();
}

По умолчанию все ошибочные сообщения будут немедленно помещены в начало целевой очереди снова и снова.

Давайте запустим наше тестовое приложение, выполнив следующую команду Maven:

mvn spring-boot:run -Dstart-class=com.foreach.springamqp.errorhandling.ErrorHandlingApp

Теперь мы должны увидеть аналогичный результирующий вывод:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
Caused by: com.foreach.springamqp.errorhandling.errorhandler.BusinessException: null

Следовательно, по умолчанию мы будем видеть в выводе бесконечное количество таких сообщений.

Чтобы изменить это поведение, у нас есть два варианта:

  • Установите для параметра default-requeue-rejected значение false на стороне слушателя – spring.rabbitmq.listener.simple.default-requeue-rejected=false
  • Создайте исключение AmqpRejectAndDontRequeueException это может быть полезно для сообщений, которые в будущем не будут иметь смысла, поэтому их можно будет отбросить.

Теперь давайте узнаем, как более разумно обрабатывать ошибочные сообщения.

5. Очередь недоставленных писем

Очередь недоставленных писем (DLQ) — это очередь, в которой хранятся недоставленные или не доставленные сообщения . DLQ позволяет нам обрабатывать ошибочные или неверные сообщения, отслеживать шаблоны отказов и восстанавливаться после исключений в системе.

Что еще более важно, это помогает предотвратить бесконечные циклы в очередях, которые постоянно обрабатывают неверные сообщения и снижают производительность системы.

В целом существует две основные концепции: обмен недоставленными письмами (DLX) и сама очередь недоставленных сообщений (DLQ). По сути, DLX — это обычный обмен, который мы можем определить как один из распространенных типов : прямой , топик или разветвленный .

Очень важно понимать, что производитель ничего не знает об очередях. Он знает только об обмене, и все созданные сообщения маршрутизируются в соответствии с конфигурацией обмена и ключом маршрутизации сообщений .

Теперь давайте посмотрим, как обрабатывать исключения, применяя подход Dead Letter Queue.

5.1. Базовая конфигурация

Чтобы настроить DLQ, нам нужно указать дополнительные аргументы при определении нашей очереди:

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

В приведенном выше примере мы использовали два дополнительных аргумента: x-dead-letter-exchange и x-dead-letter-routing-key . Пустое строковое значение для опции x-dead-letter-exchange указывает брокеру использовать обмен по умолчанию .

Второй аргумент так же важен, как и установка ключей маршрутизации для простых сообщений. Эта опция изменяет первоначальный ключ маршрутизации сообщения для дальнейшей маршрутизации DLX.

5.2. Маршрутизация ошибочных сообщений

Таким образом, когда сообщение не может быть доставлено, оно направляется на сервер обмена недоставленными письмами. Но, как мы уже отмечали, DLX — это обычная биржа. Таким образом, если неверный ключ маршрутизации сообщений не соответствует обмену, он не будет доставлен в DLQ.

Exchange: (AMQP default)
Routing Key: foreach-messages-queue.dlq

Таким образом, если в нашем примере мы опустим аргумент x-dead-letter-routing-key , ошибочное сообщение застрянет в бесконечном цикле повторных попыток.

Кроме того, исходная метаинформация сообщения доступна в заголовке x-death :

x-death:
count: 1
exchange: foreach-messages-exchange
queue: foreach-messages-queue
reason: rejected
routing-keys: foreach-messages-queue
time: 1571232954

Приведенная выше информация доступна в консоли управления RabbitMQ, обычно работающей локально на порту 15672.

Помимо этой конфигурации, если мы используем Spring Cloud Stream , мы можем даже упростить процесс настройки, используя свойства конфигурации republishToDlq и autoBindDlq .

5.3. Обмен недоставленными письмами

В предыдущем разделе мы видели, что ключ маршрутизации изменяется, когда сообщение направляется на обмен недоставленными сообщениями. Но такое поведение не всегда желательно. Мы можем изменить его, самостоятельно настроив DLX и определив его с помощью типа разветвления :

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}

@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

На этот раз мы определили пользовательский обмен типа разветвления , поэтому сообщения будут отправляться во все ограниченные очереди . Кроме того, мы установили значение аргумента x-dead-letter-exchange в имя нашего DLX. В то же время мы удалили аргумент x-dead-letter-routing-key .

Теперь, если мы запустим наш пример, неудачное сообщение должно быть доставлено в DLQ, но без изменения начального ключа маршрутизации:

Exchange: foreach-messages-queue.dlx
Routing Key: foreach-messages-queue

5.4. Обработка сообщений очереди недоставленных сообщений

Конечно, причина, по которой мы переместили их в очередь недоставленных писем, заключается в том, чтобы их можно было повторно обработать в другое время.

Давайте определим прослушиватель для очереди недоставленных писем:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
log.info("Received failed message: {}", message.toString());
}

Если мы сейчас запустим наш пример кода, мы должны увидеть вывод журнала:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer :
Received failed message:

Мы получили ошибочное сообщение, но что нам делать дальше? Ответ зависит от конкретных системных требований, вида исключения или типа сообщения.

Например, мы можем просто повторно поставить сообщение в исходное место назначения:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Но такая логика исключения не отличается от политики повторных попыток по умолчанию:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
Received message:
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer :
Received failed message, requeueing:

Обычная стратегия может потребовать повторной обработки сообщения n раз, а затем отклонить его. Давайте реализуем эту стратегию, используя заголовки сообщений:

public void processFailedMessagesRetryHeaders(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
.getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Discarding message");
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties()
.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Сначала мы получаем значение заголовка x-retries-count , затем сравниваем это значение с максимально допустимым значением. Впоследствии, если счетчик достигает предельного количества попыток, сообщение будет отброшено:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Discarding message

Мы должны добавить, что мы также можем использовать заголовок x-message-ttl , чтобы установить время, после которого сообщение должно быть отброшено. Это может быть полезно для предотвращения бесконечного роста очередей.

5.5. Очередь на парковке

С другой стороны, рассмотрим ситуацию, когда мы не можем просто отбросить сообщение, например, это может быть транзакция в банковском домене. Кроме того, иногда сообщение может потребовать ручной обработки или нам просто нужно записать сообщения, которые не удалось обработать более n раз.

Для подобных ситуаций существует понятие очереди на парковке . Мы можем пересылать все сообщения из DLQ, которые не прошли более допустимого количества раз, в очередь на парковку для дальнейшей обработки .

Давайте теперь реализуем эту идею:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";

@Bean
FanoutExchange parkingLotExchange() {
return new FanoutExchange(EXCHANGE_PARKING_LOT);
}

@Bean
Queue parkingLotQueue() {
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}

@Bean
Binding parkingLotBinding() {
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

Во-вторых, давайте реорганизуем логику прослушивателя, чтобы отправить сообщение в очередь на парковке:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
.getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Sending message to the parking lot queue");
rabbitTemplate.send(EXCHANGE_PARKING_LOT,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties()
.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

В конце концов, нам также нужно обработать сообщения, поступающие в очередь на парковку:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
log.info("Received message in parking lot queue");
// Save to DB or send a notification.
}

Теперь мы можем сохранить ошибочное сообщение в базе данных или, возможно, отправить уведомление по электронной почте.

Давайте проверим эту логику, запустив наше приложение:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Received message in parking lot queue

Как видно из вывода, после нескольких неудачных попыток сообщение было отправлено в очередь на парковку.

6. Пользовательская обработка ошибок

В предыдущем разделе мы видели, как обрабатывать сбои с помощью выделенных очередей и обменов. Однако иногда нам может потребоваться отловить все ошибки, например, для регистрации или сохранения их в базе данных.

6.1. Глобальный обработчик ошибок

До сих пор мы использовали SimpleRabbitListenerContainerFactory по умолчанию, и эта фабрика по умолчанию использует ConditionalRejectingErrorHandler . Этот обработчик перехватывает различные исключения и преобразует их в одно из исключений в иерархии AmqpException .

Важно отметить, что если нам нужно обрабатывать ошибки подключения, нам нужно реализовать интерфейс ApplicationListener .

Проще говоря, ConditionalRejectingErrorHandler решает, отклонять конкретное сообщение или нет. Когда сообщение, вызвавшее исключение, отклонено, оно не будет помещено в очередь повторно.

Давайте определим собственный ErrorHandler , который будет просто запрашивать только BusinessException s:

public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof BusinessException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}
}

Кроме того, поскольку мы выбрасываем исключение внутри нашего метода прослушивателя, оно обертывается в ListenerExecutionFailedException . Итак, нам нужно вызвать метод getCause , чтобы получить исходное исключение.

6.2. FatalExceptionСтратегии

Под капотом этот обработчик использует FatalExceptionStrategy для проверки того, следует ли считать исключение фатальным. В этом случае ошибочное сообщение будет отклонено.

По умолчанию эти исключения являются фатальными:

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • MethodArgumentTypeMismatchException
  • NoSuchMethodException
  • ClassCastException

Вместо реализации интерфейса ErrorHandler мы можем просто предоставить нашу FatalExceptionStrategy :

public class CustomFatalExceptionStrategy 
extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
return !(t.getCause() instanceof BusinessException);
}
}

Наконец, нам нужно передать нашу пользовательскую стратегию конструктору ConditionalRejectingErrorHandler :

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}

@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}

@Bean
FatalExceptionStrategy customExceptionStrategy() {
return new CustomFatalExceptionStrategy();
}

7. Заключение

В этом руководстве мы обсудили различные способы обработки ошибок при использовании Spring AMQP и, в частности, RabbitMQ.

Каждой системе нужна определенная стратегия обработки ошибок. Мы рассмотрели наиболее распространенные способы обработки ошибок в архитектурах, управляемых событиями. Кроме того, мы увидели, что можем комбинировать несколько стратегий для создания более комплексного и надежного решения.

Как всегда, полный исходный код статьи доступен на GitHub .