Перейти к основному содержимому

Spring AMQP в реактивных приложениях

· 14 мин. чтения

1. Обзор

В этом руководстве показано, как создать простое реактивное приложение Spring Boot, которое интегрируется с сервером обмена сообщениями RabbitMQ, популярной реализацией стандарта обмена сообщениями AMQP.

Мы рассмотрим оба сценария — «точка-точка» и «публикация-подписка» — с использованием распределенной установки, которая подчеркивает различия между обоими шаблонами.

Обратите внимание, что мы предполагаем базовые знания AMQP, RabbitMQ и Spring Boot, в частности, таких ключевых понятий, как обмены, очереди, темы и так далее. Более подробную информацию об этих концепциях можно найти по ссылкам ниже:

2. Настройка сервера RabbitMQ

Хотя мы могли бы настроить локальный RabbitMQ локально, на практике мы, скорее всего, будем использовать выделенную установку с дополнительными функциями, такими как высокая доступность, мониторинг, безопасность и т. д.

Чтобы смоделировать такую среду на нашей машине разработки, мы будем использовать Docker для создания сервера, который будет использовать наше приложение.

Следующая команда запустит автономный сервер RabbitMQ:

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

Мы не объявляем какой-либо постоянный том, поэтому непрочитанные сообщения будут потеряны между перезапусками. Служба будет доступна на порту 5672 на хосте.

Мы можем проверить журналы сервера с помощью команды docker logs , которая должна выдать такой вывод:

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
Starting RabbitMQ 3.7.5 on Erlang 20.3.5
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL. See http://www.rabbitmq.com/

## ##
## ## RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: <stdout>

Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
node : rabbit@rabbit
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : CY9rzUYh03PK3k6DJie09g==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

Поскольку образ включает утилиту rabbitmqctl , мы можем использовать ее для выполнения административных задач в контексте нашего работающего образа.

Например, мы можем получить информацию о состоянии сервера с помощью следующей команды:

$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
{running_applications,
[{rabbit,"RabbitMQ","3.7.5"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.5"},
// ... other info omitted for brevity

Другие полезные команды включают в себя:

  • list_exchanges : Список всех объявленных бирж
  • list_queues : список всех объявленных очередей, включая количество непрочитанных сообщений.
  • list_bindings : список всех определений привязок между обменами и очередями, включая ключи маршрутизации.

3. Настройка проекта Spring AMQP

Как только наш сервер RabbitMQ будет запущен и запущен, мы можем перейти к созданию нашего проекта Spring. Этот пример проекта позволит любому клиенту REST отправлять и/или получать сообщения на сервер обмена сообщениями, используя модуль Spring AMQP и соответствующий стартер Spring Boot для связи с ним.

Основные зависимости, которые нам нужно добавить в наш файл проекта pom.xml :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

Spring-boot-starter-amqp содержит все материалы, связанные с AMQP, тогда как spring-boot-starter-webflux является основной зависимостью, используемой для реализации нашего реактивного сервера REST.

Примечание. Вы можете проверить последнюю версию модулей Spring Boot Starter AMQP и Webflux на Maven Central.

4. Сценарий 1: двухточечный обмен сообщениями

В этом первом сценарии мы будем использовать прямой обмен, который является логическим объектом в брокере, который получает сообщения от клиентов.

Прямой обмен будет направлять все входящие сообщения в одну и только одну очередь , из которой они будут доступны для потребления клиентами. Несколько клиентов могут подписаться на одну и ту же очередь, но только один из них получит данное сообщение.

4.1. Обмен и настройка очередей

В нашем сценарии мы используем объект DestinationInfo , который инкапсулирует имя биржи и ключ маршрутизации. Карта с указанием имени пункта назначения будет использоваться для хранения всех доступных пунктов назначения.

За эту первоначальную настройку будет отвечать следующий метод @PostConstruct :

@Autowired
private AmqpAdmin amqpAdmin;

@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
destinationsConfig.getQueues()
.forEach((key, destination) -> {
Exchange ex = ExchangeBuilder.directExchange(
destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.durable(
destination.getRoutingKey())
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
});
}

Этот метод использует bean- компонент adminAmqp , созданный Spring, для объявления обменов, очередей и связывания их вместе с использованием заданного ключа маршрутизации.

Все назначения поступают из bean-компонента DestinationsConfig , который является классом @ConfigurationProperties , используемым в нашем примере.

Этот класс имеет свойство, заполненное объектами DestinationInfo , созданными на основе сопоставлений, считанных из файла конфигурации application.yml .

4.2. Конечная точка производителя

Производители будут отправлять сообщения, отправляя HTTP POST в местоположение /queue/{name} .

Это реактивная конечная точка, поэтому мы используем Mono для возврата простого подтверждения:

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {

// ... other members omitted

@Autowired
private AmqpTemplate amqpTemplate;

@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(
@PathVariable String name, @RequestBody String payload) {

DestinationInfo d = destinationsConfig
.getQueues().get(name);
if (d == null) {
return Mono.just(
ResponseEntity.notFound().build());
}

return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(
d.getExchange(),
d.getRoutingKey(),
payload);
return ResponseEntity.accepted().build();
});
}

Сначала мы проверяем, соответствует ли параметр имени допустимому месту назначения, и если да, мы используем экземпляр amqpTemplate с автоматическим подключением , чтобы фактически отправить полезную нагрузку — простое строковое сообщение — в RabbitMQ.

4.3. Фабрика прослушивателей сообщений

Для асинхронного получения сообщений Spring AMQP использует абстрактный класс MessageContainerListener , который является посредником в потоке информации из очередей AMQP и слушателей, предоставляемых приложением.

Поскольку нам нужна конкретная реализация этого класса, чтобы прикрепить наши прослушиватели сообщений, мы определяем фабрику, которая изолирует код контроллера от его фактической реализации.

В нашем случае фабричный метод возвращает новый SimpleMessageContainerListener каждый раз, когда мы вызываем его метод createMessageListenerContainer :

@Component
public class MessageListenerContainerFactory {

@Autowired
private ConnectionFactory connectionFactory;

public MessageListenerContainerFactory() {}

public MessageListenerContainer createMessageListenerContainer(String queueName) {
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
mlc.addQueueNames(queueName);
return mlc;
}
}

4.4. Потребительская конечная точка

Потребители будут получать доступ к тому же адресу конечной точки, который используется производителями ( /queue/{name} ) для получения сообщений.

Эта конечная точка возвращает поток событий, где каждое событие соответствует полученному сообщению:

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
value = "/queue/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

DestinationInfo d = destinationsConfig
.getQueues()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}

MessageListenerContainer mlc = messageListenerContainerFactory
.createMessageListenerContainer(d.getRoutingKey());

Flux<String> f = Flux.<String> create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> {
mlc.start();
});
emitter.onDispose(() -> {
mlc.stop();
});
});

return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}

После первоначальной проверки имени назначения конечная точка потребителя создает MessageListenerContainer, используя MessageListenerContainerFactory и имя очереди, восстановленное из нашего реестра.

Получив наш MessageListenerContainer , мы создаем поток сообщений, используя один из его методов построения () .

В нашем конкретном случае мы используем тот, который принимает лямбду с аргументом FluxSink , который мы затем используем для соединения асинхронного API Spring AMQP на основе прослушивателя с нашим реактивным приложением.

Мы также присоединяем две дополнительные лямбда-выражения к обратным вызовам эмиттера onRequest() и onDispose() , чтобы наш MessageListenerContainer мог выделять/освобождать свои внутренние ресурсы в соответствии с жизненным циклом Flux .

Наконец, мы объединяем полученный поток с другим, созданным с помощью interval(), который создает новое событие каждые пять секунд. Эти фиктивные сообщения играют важную роль в нашем случае : без них мы бы обнаружили отключение клиента только после получения сообщения и невозможности его отправки, что может занять много времени в зависимости от вашего конкретного варианта использования.

4.5. Тестирование

Теперь, когда у нас настроены конечные точки потребителя и издателя, мы можем провести некоторые тесты с нашим образцом приложения.

Нам нужно определить детали подключения к серверу RabbitMQ и по крайней мере один пункт назначения в нашем application.yml , который должен выглядеть следующим образом:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE

Свойства spring.rabbitmq.* определяют основные свойства, необходимые для подключения к нашему серверу RabbitMQ, работающему в локальном контейнере Docker. Обратите внимание, что IP-адрес, показанный выше, является лишь примером и может отличаться в конкретной настройке.

Очереди определяются с использованием destinations.queues.<name>.* , где <name> используется в качестве имени назначения. Здесь мы объявили один пункт назначения с именем «NYSE», который будет отправлять сообщения на биржу «nyse» на RabbitMQ с ключом маршрутизации «NYSE».

Как только мы запустим сервер через командную строку или из нашей IDE, мы сможем начать отправлять и получать сообщения. Мы будем использовать утилиту curl , общую утилиту, доступную для ОС Windows, Mac и Linux.

В следующем листинге показано, как отправить сообщение в пункт назначения и ожидаемый ответ от сервера:

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

После выполнения этой команды мы можем убедиться, что сообщение было получено RabbitMQ и готово к использованию, выполнив следующую команду:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE 1

Теперь мы можем читать сообщения с помощью curl с помощью следующей команды:

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

Как мы видим, сначала мы получаем ранее сохраненное сообщение, а затем начинаем получать наше фиктивное сообщение каждые 5 секунд.

Если мы снова запустим команду для вывода списка очередей, мы увидим, что сообщений не сохранено:

$ docker exec rabbitmq rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE 0

5. Сценарий 2: публикация-подписка

Другим распространенным сценарием для приложений обмена сообщениями является шаблон публикации-подписки, в котором одно сообщение должно быть отправлено нескольким потребителям.

RabbitMQ предлагает два типа обменов, которые поддерживают такие приложения: разветвление и тематика.

Основное различие между этими двумя типами заключается в том, что последний позволяет нам фильтровать, какие сообщения получать, на основе шаблона ключа маршрутизации (например, «alarm.mailserver.*»), предоставленного во время регистрации, в то время как первый просто реплицирует входящие сообщения на все связанные очереди.

RabbitMQ также поддерживает обмен заголовками, что позволяет выполнять более сложную фильтрацию сообщений, но его использование выходит за рамки этой статьи.

5.1. Настройка направлений

Мы определяем места назначения Pub/Sub во время запуска с помощью другого метода @PostConstruct , как мы это делали в сценарии «точка-точка».

Единственная разница в том, что мы создаем только Exchanges , но не Queues — они будут созданы по запросу и привязаны к Exchange позже, так как нам нужна эксклюзивная Queue для каждого клиента:

@PostConstruct
public void setupTopicDestinations(
destinationsConfig.getTopics()
.forEach((key, destination) -> {
Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
});
}

5.2. Конечная точка издателя

Клиенты будут использовать конечную точку издателя, доступную в расположении /topic/{name} , для публикации сообщений, которые будут отправлены всем подключенным клиентам.

Как и в предыдущем сценарии, мы используем @PostMapping , который возвращает Mono со статусом после отправки сообщения:

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
@PathVariable String name, @RequestBody String payload) {

DestinationInfo d = destinationsConfig
.getTopics()
.get(name);

if (d == null) {
return Mono.just(ResponseEntity.notFound().build());
}

return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(
d.getExchange(), d.getRoutingKey(),payload);
return ResponseEntity.accepted().build();
});
}

5.3. Конечная точка подписчика

Наша конечная точка подписчика будет расположена по адресу /topic/{name} , создавая поток сообщений для подключенных клиентов.

Эти сообщения включают в себя как полученные сообщения, так и фиктивные сообщения, генерируемые каждые 5 секунд:

@GetMapping(
value = "/topic/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getTopics()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
Queue topicQueue = createTopicQueue(d);
String qname = topicQueue.getName();
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
Flux<String> f = Flux.<String> create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> {
mlc.start();
});
emitter.onDispose(() -> {
amqpAdmin.deleteQueue(qname);
mlc.stop();
});
});

return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}

Этот код в основном такой же, как мы видели в предыдущем случае, со следующими отличиями: во-первых, мы создаем новую очередь для каждого нового подписчика.

Мы делаем это с помощью вызова метода createTopicQueue() , который использует информацию из экземпляра DestinationInfo для создания эксклюзивной неустойчивой очереди, которую мы затем привязываем к Exchange с помощью настроенного ключа маршрутизации:

private Queue createTopicQueue(DestinationInfo destination) {

Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder
.nonDurable()
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
return q;
}

Обратите внимание, что, несмотря на то, что мы снова объявляем Exchange , RabbitMQ не создаст новый, поскольку мы уже объявили его во время запуска.

Второе отличие заключается в лямбда-выражении, которое мы передаем методу onDispose() , который на этот раз также удалит очередь при отключении подписчика.

5.3. Тестирование

Чтобы протестировать сценарий Pub-Sub, мы должны сначала определить назначение темы в нашем application.yml следующим образом:

destinations:
## ... queue destinations omitted
topics:
weather:
exchange: alerts
routing-key: WEATHER

Здесь мы определили конечную точку темы, которая будет доступна в расположении /topic/weather . Эта конечная точка будет использоваться для отправки сообщений в обмен «оповещениями» на RabbitMQ с ключом маршрутизации «ПОГОДА».

После запуска сервера мы можем убедиться, что обмен был создан с помощью команды rabbitmqctl :

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic topic
amq.fanout fanout
amq.match headers
amq.headers headers
direct
amq.rabbitmq.trace topic
amq.direct direct
alerts topic

Теперь, если мы выполним команду list_bindings , мы увидим, что нет очередей, связанных с обменом «оповещениями»:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange NYSE queue NYSE []
nyse exchange NYSE queue NYSE []

Давайте запустим пару подписчиков, которые будут подписываться на наш пункт назначения, открыв две командные оболочки и выполнив следующую команду для каждой:

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

Наконец, мы снова используем curl для отправки уведомлений нашим подписчикам:

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

Как только мы отправим сообщение, мы почти сразу же увидим сообщение «Ураган приближается!» на оболочке каждого абонента.

Если мы сейчас проверим доступные привязки, то увидим, что у нас есть одна очередь для каждого подписчика:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange IBOV queue IBOV []
exchange NYSE queue NYSE []
exchange spring.gen-i0m0pbyKQMqpz2_KFZCd0g
queue spring.gen-i0m0pbyKQMqpz2_KFZCd0g []
exchange spring.gen-wCHALTsIS1q11PQbARJ7eQ
queue spring.gen-wCHALTsIS1q11PQbARJ7eQ []
alerts exchange spring.gen-i0m0pbyKQMqpz2_KFZCd0g
queue WEATHER []
alerts exchange spring.gen-wCHALTsIS1q11PQbARJ7eQ
queue WEATHER []
ibov exchange IBOV queue IBOV []
nyse exchange NYSE queue NYSE []
quotes exchange NYSE queue NYSE []

Как только мы нажмем Ctrl-C в оболочке подписчика, наш шлюз в конечном итоге обнаружит, что клиент отключился, и удалит эти привязки.

6. Заключение

В этой статье мы продемонстрировали, как создать простое реактивное приложение, которое взаимодействует с сервером RabbitMQ, используя модуль spring-amqp .

С помощью всего нескольких строк кода мы смогли создать функциональный шлюз HTTP-to-AMQP, который поддерживает шаблоны интеграции «точка-точка» и «публикация-подписка», которые мы можем легко расширить, добавив дополнительные функции, такие как безопасность с помощью добавление стандартных функций Spring.

Код, показанный в этой статье, доступен на Github.