Перейти к основному содержимому

Экспоненциальная отсрочка с Spring AMQP

· 7 мин. чтения

1. Введение

По умолчанию в Spring AMQP ошибочное сообщение повторно ставится в очередь для следующего раунда потребления. Следовательно, может возникнуть бесконечный цикл потребления, что приведет к нестабильной ситуации и пустой трате ресурсов.

Хотя использование очереди недоставленных сообщений является стандартным способом обработки ошибочных сообщений , мы можем захотеть повторить попытку использования сообщений и вернуть систему в нормальное состояние.

В этом руководстве мы представим два разных способа реализации стратегии повторных попыток под названием «Экспоненциальный откат » .

2. Предпосылки

В этом руководстве мы будем использовать RabbitMQ , популярную реализацию AMQP . Следовательно, мы можем обратиться к этой статье Spring AMQP для получения дальнейших инструкций о том, как настроить и использовать RabbitMQ с Spring.

Для простоты мы также будем использовать образ докера для нашего экземпляра RabbitMQ, хотя подойдет любой экземпляр RabbitMQ, прослушивающий порт 5672.

Давайте запустим док-контейнер RabbitMQ:

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

Чтобы реализовать наши примеры, нам нужно добавить зависимость от spring-boot-starter-amqp . Последняя версия доступна на Maven Central :

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
</dependencies>

3. Блокирующий путь

Наш первый способ будет использовать фикстуры Spring Retry . Мы создадим простую очередь и потребителя, настроенного на ожидание в течение некоторого времени между повторными попытками ошибочного сообщения.

Во-первых, давайте создадим нашу очередь:

@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue").build();
}

Во-вторых, давайте настроим стратегию отсрочки в RetryOperationsInterceptor и подключим ее к пользовательской RabbitListenerContainerFactory :

@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(5)
.recoverer(observableRecoverer())
.build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);

return factory;
}

Как показано выше, мы настраиваем начальный интервал 1000 мс и множитель 3,0 до максимального времени ожидания 10000 мс. Кроме того, после пяти попыток сообщение будет удалено.

Давайте добавим нашего потребителя и вызовем ошибочное сообщение, создав исключение:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);

throw new Exception("exception occured!");
}

Наконец, давайте создадим тест и отправим в нашу очередь два сообщения:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;

CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());

for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}

latch.await();
}

Имейте в виду, что CountdownLatch используется только как тестовое приспособление.

Давайте запустим тест и проверим вывод нашего журнала:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875 ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

Как видно, этот журнал правильно показывает экспоненциальное время ожидания между каждой повторной попыткой. Пока наша стратегия отсрочки работает, наш потребитель блокируется до исчерпания повторных попыток. Тривиальное улучшение состоит в том, чтобы заставить нашего потребителя выполняться одновременно, установив атрибут concurrency @RabbitListener :

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

Однако повторная попытка сообщения по-прежнему блокирует экземпляр потребителя. Поэтому приложение может страдать от проблем с задержкой.

В следующем разделе мы представим неблокирующий способ реализации аналогичной стратегии.

4. Неблокирующий способ

Альтернативный способ включает несколько очередей повторных попыток в сочетании с истечением срока действия сообщения. На самом деле, когда срок действия сообщения истекает, оно попадает в очередь недоставленных сообщений. Другими словами, если потребитель DLQ отправляет сообщение обратно в исходную очередь, мы, по сути, выполняем цикл повторных попыток .

В результате количество использованных очередей повторных попыток равно числу возможных попыток .

Во-первых, давайте создадим очередь недоставленных сообщений для наших очередей повторных попыток:

@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

Давайте добавим потребителя в очередь повторных недоставленных сообщений. Единственная ответственность этого потребителя заключается в отправке сообщения обратно в исходную очередь :

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
MessageProperties props = message.getMessageProperties();

rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"),
props.getHeader("x-original-routing-key"), message);
}

Во-вторых, давайте создадим объект-оболочку для наших очередей повторных попыток. Этот объект будет содержать экспоненциальную конфигурацию отсрочки:

public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;

// constructor, getters and setters

В-третьих, давайте определим три очереди повторных попыток:

@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}

@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}

@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}

@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

Затем нам нужен перехватчик для обработки потребления сообщений:

public class RetryQueuesInterceptor implements MethodInterceptor {

// fields and constructor

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
sendToNextRetryQueue(messageAndChannel, retryCount);
} catch (Throwable t) {
// ...
throw new RuntimeException(t);
}
});
}

В случае успешного возврата потребителя мы просто подтверждаем сообщение.

Однако, если потребитель выбрасывает исключение и остались попытки, мы отправляем сообщение в следующую очередь повторных попыток:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);

rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

return m;
});

mac.channel.basicReject(mac.message.getMessageProperties()
.getDeliveryTag(), false);
}

Опять же, давайте подключим наш перехватчик к пользовательской RabbitListenerContainerFactory :

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);

return factory;
}

Наконец, мы определяем нашу основную очередь и потребителя, который имитирует ошибочное сообщение:

@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue")
.build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory",
ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);

throw new Exception("Error occured!");
}

Давайте создадим еще один тест и отправим два сообщения:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;

CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());

for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}

latch.await();
}

Затем запустим наш тест и проверим лог:

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

Опять же, мы видим экспоненциальное время ожидания между каждой повторной попыткой. Однако вместо блокировки до тех пор, пока не будет предпринята каждая попытка, сообщения обрабатываются одновременно .

Хотя эта настройка довольно гибкая и помогает решить проблемы с задержкой, существует распространенная ошибка. Действительно, RabbitMQ удаляет сообщение с истекшим сроком действия только тогда, когда оно достигает головы очереди . Поэтому, если сообщение имеет больший срок действия, оно заблокирует все остальные сообщения в очереди. По этой причине очередь ответов должна содержать только сообщения с одинаковым сроком действия .

4. Вывод

Как показано выше, событийные системы могут реализовывать экспоненциальную стратегию отсрочки для повышения отказоустойчивости. Хотя внедрение таких решений может быть тривиальным, важно понимать, что определенное решение может быть хорошо адаптировано к небольшой системе, но вызывать проблемы с задержкой в экосистемах с высокой пропускной способностью.

Исходный код доступен на GitHub .