Перейти к основному содержимому

Управление очередями Amazon SQS в Java

· 5 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим, как использовать Amazon SQS (Simple Queue Service) с помощью Java SDK .

2. Предпосылки

Зависимости Maven, настройки учетной записи AWS и клиентское подключение, необходимые для использования SDK Amazon AWS для SQS, такие же, как в этой статье здесь .

Предполагая, что мы создали экземпляр AWSCredentials, как описано в предыдущей статье, мы можем продолжить и создать наш клиент SQS:

AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(Regions.US_EAST_1)
.build();

3. Создание очередей

После того, как мы настроили наш клиент SQS, создавать очереди довольно просто.

3.1. Создание стандартной очереди

Давайте посмотрим, как мы можем создать стандартную очередь. Для этого нам нужно создать экземпляр CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("foreach-queue");
String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl();

3.2. Создание очереди FIFO

Создание FIFO похоже на создание стандартной очереди. Мы по-прежнему будем использовать экземпляр CreateQueueRequest , как и раньше. Только на этот раз нам нужно передать атрибуты очереди и установить для атрибута FifoQueue значение true :

Map<String, String> queueAttributes = new HashMap<>();
queueAttributes.put("FifoQueue", "true");
queueAttributes.put("ContentBasedDeduplication", "true");
CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(
"foreach-queue.fifo").withAttributes(queueAttributes);
String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest)
.getQueueUrl();

4. Отправка сообщений в очереди

Как только мы настроим наши очереди, мы можем начать отправлять сообщения.

4.1. Отправка сообщения в стандартную очередь

Чтобы отправлять сообщения в стандартную очередь, нам нужно создать экземпляр SendMessageRequest.

Затем мы присоединяем к этому запросу карту атрибутов сообщения:

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("AttributeOne", new MessageAttributeValue()
.withStringValue("This is an attribute")
.withDataType("String"));

SendMessageRequest sendMessageStandardQueue = new SendMessageRequest()
.withQueueUrl(standardQueueUrl)
.withMessageBody("A simple message.")
.withDelaySeconds(30)
.withMessageAttributes(messageAttributes);

sqs.sendMessage(sendMessageStandardQueue);

withDelaySeconds () указывает, через какое время сообщение должно поступить в очередь.

4.2. Отправка сообщения в очередь FIFO

Единственная разница в этом случае заключается в том, что нам нужно будет указать группу , к которой принадлежит сообщение:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest()
.withQueueUrl(fifoQueueUrl)
.withMessageBody("Another simple message.")
.withMessageGroupId("foreach-group-1")
.withMessageAttributes(messageAttributes);

Как вы можете видеть в приведенном выше примере кода, мы указываем группу с помощью withMessageGroupId().

4.3. Отправка нескольких сообщений в очередь

Мы также можем отправить несколько сообщений в очередь, используя один запрос. Мы создадим список SendMessageBatchRequestEntry , который будем отправлять, используя экземпляр SendMessageBatchRequest :

List <SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
messageEntries.add(new SendMessageBatchRequestEntry()
.withId("id-1")
.withMessageBody("batch-1")
.withMessageGroupId("foreach-group-1"));
messageEntries.add(new SendMessageBatchRequestEntry()
.withId("id-2")
.withMessageBody("batch-2")
.withMessageGroupId("foreach-group-1"));

SendMessageBatchRequest sendMessageBatchRequest
= new SendMessageBatchRequest(fifoQueueUrl, messageEntries);
sqs.sendMessageBatch(sendMessageBatchRequest);

5. Чтение сообщений из очередей

Мы можем получать сообщения из наших очередей, вызывая метод receiveMessage() для экземпляра ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl)
.withWaitTimeSeconds(10)
.withMaxNumberOfMessages(10);

List<Message> sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages();

Используя withMaxNumberOfMessages(), мы указываем, сколько сообщений нужно получить из очереди — хотя следует отметить, что максимальное количество — 10 .

Метод withWaitTimeSeconds() включает длительный опрос . Длинный опрос — это способ ограничить количество запросов на получение сообщений, которые мы отправляем в SQS. ``

Проще говоря, это означает, что мы будем ждать до указанного количества секунд, чтобы получить сообщение. Если за это время в очереди нет сообщений, то запрос будет пустым. Если сообщение поступит в очередь в течение этого времени, оно будет возвращено.

Мы можем получить атрибуты и тело данного сообщения:

sqsMessages.get(0).getAttributes();
sqsMessages.get(0).getBody();

6. Удаление сообщения из очереди

Чтобы удалить сообщение, мы будем использовать DeleteMessageRequest :

sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(fifoQueueUrl)
.withReceiptHandle(sqsMessages.get(0).getReceiptHandle()));

7. Очереди недоставленных писем

Очередь недоставленных сообщений должна быть того же типа, что и ее базовая очередь — она должна быть FIFO, если базовая очередь FIFO, и стандартная, если базовая очередь стандартная. В этом примере мы будем использовать стандартную очередь.

Первое, что нам нужно сделать, это создать то, что станет нашей очередью недоставленных сообщений:

String deadLetterQueueUrl = sqs.createQueue("foreach-dead-letter-queue").getQueueUrl();

Далее мы получим ARN (имя ресурса Amazon) нашей вновь созданной очереди :

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes(
new GetQueueAttributesRequest(deadLetterQueueUrl)
.withAttributeNames("QueueArn"));

String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
.get("QueueArn");

Наконец, мы устанавливаем эту вновь созданную очередь в качестве очереди недоставленных сообщений нашей исходной стандартной очереди:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
.withQueueUrl(standardQueueUrl)
.addAttributesEntry("RedrivePolicy",
"{\"maxReceiveCount\":\"2\", "
+ "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");

sqs.setQueueAttributes(queueAttributesRequest);

Пакет JSON, который мы установили в методе addAttributesEntry() при создании нашего экземпляра SetQueueAttributesRequest , содержит необходимую нам информацию : maxReceiveCount равен 2 , что означает, что если сообщение получено столько раз, предполагается, что оно не было обработано правильно, и отправляется в нашу очередь недоставленных сообщений.

Атрибут deadLetterTargetArn указывает нашу стандартную очередь на нашу вновь созданную очередь недоставленных сообщений.

8. Мониторинг

С помощью SDK мы можем проверить, сколько сообщений в данный момент находится в заданной очереди и сколько находится в обработке. Во-первых, нам нужно создать GetQueueAttributesRequest.

Оттуда мы проверим состояние очереди:

GetQueueAttributesRequest getQueueAttributesRequest 
= new GetQueueAttributesRequest(standardQueueUrl)
.withAttributeNames("All");
GetQueueAttributesResult getQueueAttributesResult
= sqs.getQueueAttributes(getQueueAttributesRequest);
System.out.println(String.format("The number of messages on the queue: %s",
getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s",
getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessagesNotVisible")));

Более глубокий мониторинг можно осуществить с помощью Amazon Cloud Watch .

9. Заключение

В этой статье мы увидели, как управлять очередями SQS с помощью AWS Java SDK.

Как обычно, все примеры кода, использованные в статье, можно найти на GitHub .