1. Введение
В этом руководстве мы рассмотрим концепцию разветвления
и обмена темами с помощью Spring AMQP и RabbitMQ .
На высоком уровне разветвленные обмены будут транслировать одно и то же сообщение во все связанные очереди , в то время как тематические обмены используют ключ маршрутизации для передачи сообщений в определенную связанную очередь или очереди .
Для этого руководства рекомендуется предварительно прочитать Messaging With Spring AMQP .
2. Настройка разветвленной биржи
Давайте настроим один разветвленный обмен с двумя привязанными к нему очередями. Когда мы отправим сообщение на этот обмен, обе очереди получат сообщение. Наш разветвленный обмен игнорирует любой ключ маршрутизации, включенный в сообщение.
Spring AMQP позволяет нам агрегировать все объявления очередей, обменов и привязок в объекте Declarables
:
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue("fanout.queue1", false);
Queue fanoutQueue2 = new Queue("fanout.queue2", false);
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
return new Declarables(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}
3. Настройка обмена темами
Теперь мы также настроим обмен темами с двумя очередями, каждая из которых имеет свой шаблон привязки:
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}
Тематический обмен позволяет нам привязывать к нему очереди с разными шаблонами ключей. Это очень гибко и позволяет нам привязывать несколько очередей с одним и тем же шаблоном или даже несколько шаблонов к одной и той же очереди.
Когда ключ маршрутизации сообщения соответствует шаблону, оно будет помещено в очередь. Если в очереди есть несколько привязок, соответствующих ключу маршрутизации сообщения, в очередь помещается только одна копия сообщения.
Наши шаблоны привязки могут использовать звездочку («*») для соответствия слову в определенной позиции или знак решетки («#») для соответствия нулю или более слов.
Таким образом, наша темаQueue1
будет получать сообщения с ключами маршрутизации, имеющими шаблон из трех слов со средним словом «важный», например: «user.important.error»
или «blog.important.notification».
И наша темаQueue2
будет получать сообщения с ключами маршрутизации, оканчивающимися на слово error; примерами соответствия являются «error»
, «user.important.error»
или «blog.post.save.error».
4. Настройка продюсера
Мы будем использовать метод convertAndSend
RabbitTemplate
для отправки наших примеров сообщений:
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN,
"topic important warn" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR,
"topic important error" + message);
};
RabbitTemplate предоставляет множество перегруженных методов
convertAndSend()
для различных типов обмена.
Когда мы отправляем сообщение на разветвленный обмен, ключ маршрутизации игнорируется, и сообщение передается во все связанные очереди.
Когда мы отправляем сообщение в тему обмена, нам нужно передать ключ маршрутизации. На основе этого ключа маршрутизации сообщение будет доставлено в определенные очереди.
5. Настройка потребителей
Наконец, давайте настроим четырех потребителей — по одному на каждую очередь — для получения созданных сообщений:
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}
Мы настраиваем потребителей с помощью аннотации @RabbitListener
. Здесь передается только один аргумент — имя очереди. Потребители здесь не знают об биржах или ключах маршрутизации.
6. Запуск примера
Наш пример проекта представляет собой приложение Spring Boot, поэтому он инициализирует приложение вместе с подключением к RabbitMQ и настроит все очереди, обмены и привязки.
По умолчанию наше приложение ожидает экземпляр RabbitMQ, работающий на локальном хосте через порт 5672. Мы можем изменить это и другие значения по умолчанию в application.yaml
.
Наш проект предоставляет конечную точку HTTP в URI — /broadcast
— которая принимает POST с сообщением в теле запроса.
Когда мы отправляем запрос на этот URI с телом «Test», мы должны увидеть что-то похожее на это в выводе:
Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast
Порядок, в котором мы увидим эти сообщения, конечно, не гарантирован.
7. Заключение
В этом кратком руководстве мы рассмотрели разветвления и обмены темами с помощью Spring AMQP и RabbitMQ.
Полный исходный код и все фрагменты кода для этого руководства доступны в репозитории GitHub .