1. Обзор
В этом руководстве мы рассмотрим использование очередей сообщений и издателей/подписчиков. Это общие шаблоны, используемые в распределенных системах для двух или более служб для связи друг с другом.
В этом руководстве все примеры будут показаны с использованием брокера сообщений RabbitMQ, поэтому сначала следуйте руководству RabbitMQ, чтобы настроить и запустить его локально. Для более глубокого погружения в RabbitMQ ознакомьтесь с другим нашим руководством .
Примечание. Существует множество альтернатив RabbitMQ, которые можно использовать для тех же примеров в этом руководстве, таких как Kafka , Google Cloud Pub-Sub и Amazon SQS , и это лишь некоторые из них.
2. Что такое очереди сообщений?
Начнем с рассмотрения очередей сообщений. Очереди сообщений состоят из службы публикации и нескольких служб-потребителей, которые взаимодействуют через очередь. Это общение обычно является одним из способов, с помощью которого издатель будет отдавать команды потребителям. Служба публикации обычно помещает сообщение в очередь или обменивается, а одна служба-потребитель обрабатывает это сообщение и выполняет действие на его основе.
Рассмотрим следующий обмен:
Отсюда мы видим службу издателя, которая помещает сообщение «m n+1» в очередь. Кроме того, мы также можем видеть несколько сообщений, уже существующих в очереди, ожидающих обработки. С правой стороны у нас есть 2 потребляющих сервиса «A» и «B», которые прослушивают очередь сообщений.
Теперь рассмотрим тот же обмен через некоторое время:
Во-первых, мы видим, что сообщение издателя помещено в конец очереди. Далее, важной частью, которую следует учитывать, является правая сторона изображения. Мы видим, что потребитель «А» прочитал сообщение «m 1», и поэтому оно больше не доступно в очереди для использования другой службой «В».
2.1. Где использовать очереди сообщений
Очереди сообщений часто используются там, где мы хотим делегировать работу из службы. При этом мы хотим гарантировать, что работа выполняется только один раз.
Использование очередей сообщений популярно в микросервисных архитектурах и при разработке облачных или бессерверных приложений, поскольку это позволяет нам горизонтально масштабировать наше приложение в зависимости от нагрузки.
Например, если в очереди есть много сообщений, ожидающих обработки, мы можем запустить несколько служб-потребителей, которые прослушивают одну и ту же очередь сообщений и обрабатывают приток сообщений. После обработки сообщений службы могут быть отключены, когда трафик минимален, чтобы сэкономить на текущих расходах.
2.2. Пример использования RabbitMQ
Давайте рассмотрим пример для ясности. Наш пример примет форму пиццерии. Представьте, что люди могут заказывать пиццу через приложение, а повара в пиццерии будут забирать заказы по мере их поступления. В этом примере клиент — наш издатель, а повара — наши потребители.
Во-первых, давайте определим нашу очередь:
private static final String MESSAGE_QUEUE = "pizza-message-queue";
@Bean
public Queue queue() {
return new Queue(MESSAGE_QUEUE);
}
Используя Spring AMQP, мы создали очередь с именем «pizza-message-queue». Далее давайте определим нашего издателя, который будет публиковать сообщения в нашу вновь определенную очередь:
public class Publisher {
private RabbitTemplate rabbitTemplate;
private String queue;
public Publisher(RabbitTemplate rabbitTemplate, String queue) {
this.rabbitTemplate = rabbitTemplate;
this.queue = queue;
}
@PostConstruct
public void postMessages() {
rabbitTemplate.convertAndSend(queue, "1 Pepperoni");
rabbitTemplate.convertAndSend(queue, "3 Margarita");
rabbitTemplate.convertAndSend(queue, "1 Ham and Pineapple (yuck)");
}
}
Spring AMQP создаст для нас bean-компонент RabbitTemplate
, который имеет подключение к нашему обмену RabbitMQ, чтобы уменьшить накладные расходы на настройку. Наш издатель использует это, отправляя 3 сообщения в нашу очередь.
Теперь, когда наши заказы на пиццу поступили, нам нужно отдельное потребительское приложение. Это будет действовать как наш шеф-повар в примере и читать сообщения:
public class Consumer {
public void receiveOrder(String message) {
System.out.printf("Order received: %s%n", message);
}
}
Давайте теперь создадим MessageListenerAdapter
для нашей очереди, который будет вызывать метод порядка получения нашего Потребителя, используя отражение:
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MESSAGE_QUEUE);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(Consumer consumer) {
return new MessageListenerAdapter(consumer, "receiveOrder");
}
Сообщения, прочитанные из очереди, теперь будут направляться в метод receiveOrder класса
Consumer
. Чтобы запустить это приложение, мы можем создать столько потребительских приложений, сколько захотим для выполнения входящих заказов. Например, если в очередь было поставлено 400 заказов на пиццу, нам может понадобиться более одного «шеф-повара» потребителя, иначе заказы будут выполняться медленно. В этом случае мы могли бы запустить 10 инстансов-потребителей для своевременного выполнения заказов.
3. Что такое паб-саб?
Теперь, когда мы рассмотрели очереди сообщений, давайте рассмотрим pub-sub. И наоборот, для очередей сообщений в архитектуре pub-sub мы хотим, чтобы все наши потребляющие (подписывающиеся) приложения получали по крайней мере 1
копию сообщения, которое наш издатель отправляет на биржу.
Рассмотрим следующий обмен:
Слева у нас есть издатель, отправляющий сообщение «m n+1» в тему. Эта тема будет транслировать это сообщение своим подписчикам. Эти подписки привязаны к очередям. Каждая очередь имеет прослушивающую абонентскую службу, ожидающую сообщения.
Давайте теперь рассмотрим тот же обмен через некоторое время:
Обе службы подписки потребляют «m 1», так как обе получили копию этого сообщения. Кроме того, Topic рассылает новое сообщение «m n+1» всем своим подписчикам.
Pub sub следует использовать там, где нам нужна гарантия того, что каждый подписчик получит копию сообщения.
3.1. Пример использования RabbitMQ
Представьте, что у нас есть сайт одежды. Этот веб-сайт может отправлять push-уведомления пользователям, чтобы уведомить их о сделках. Наша система может отправлять уведомления по электронной почте или текстовые оповещения. В этом сценарии веб-сайт является нашим издателем, а службы текстовых и электронных уведомлений — нашими подписчиками.
Для начала определим нашу тему обмена и привяжем к ней 2 очереди:
private static final String PUB_SUB_TOPIC = "notification-topic";
private static final String PUB_SUB_EMAIL_QUEUE = "email-queue";
private static final String PUB_SUB_TEXT_QUEUE = "text-queue";
@Bean
public Queue emailQueue() {
return new Queue(PUB_SUB_EMAIL_QUEUE);
}
@Bean
public Queue textQueue() {
return new Queue(PUB_SUB_TEXT_QUEUE);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(PUB_SUB_TOPIC);
}
@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange exchange) {
return BindingBuilder.bind(emailQueue).to(exchange).with("notification");
}
@Bean
public Binding textBinding(Queue textQueue, TopicExchange exchange) {
return BindingBuilder.bind(textQueue).to(exchange).with("notification");
}
Теперь мы связали 2 очереди с помощью ключа маршрутизации «уведомление», что означает, что любые сообщения, размещенные в теме с этим ключом маршрутизации, будут отправляться в обе очереди. Обновляя класс Publisher
, который мы создали ранее, мы можем отправлять некоторые сообщения на наш обмен:
rabbitTemplate.convertAndSend(topic, "notification", "New Deal on T-Shirts: 95% off!");
rabbitTemplate.convertAndSend(topic, "notification", "2 for 1 on all Jeans!");
4. Сравнение
Теперь, когда мы затронули обе области, давайте кратко сравним оба типа обмена.
Как упоминалось ранее, как очереди сообщений, так и шаблоны архитектуры pub-sub — отличный способ разбить приложение, чтобы сделать его более масштабируемым по горизонтали.
Еще одно преимущество использования очередей pub-sub или очередей сообщений заключается в том, что связь более надежна, чем традиционные синхронные способы связи. Например, если приложение A связывается с приложением B через асинхронный HTTP-вызов, тогда, если какое-либо из приложений выходит из строя, данные теряются, и запрос необходимо повторить.
Использование очередей сообщений, если экземпляр приложения-потребителя выходит из строя, тогда другой потребитель сможет вместо этого обработать сообщение. Используя pub-sub, если подписчик не работает, то после его восстановления пропущенные сообщения будут доступны для потребления в его очереди подписки.
Наконец, контекст является ключевым. Выбор того, использовать ли архитектуру pub-sub или очереди сообщений, сводится к точному определению того, как вы хотите, чтобы служба-потребитель вел себя. Самый важный фактор, о котором следует помнить, — это вопрос: «Имеет ли значение, если каждый потребитель получит каждое сообщение? ”
5. Вывод
В этом руководстве мы рассмотрели очереди публикации и подписки и сообщения, а также некоторые характеристики каждой из них. Весь код, упомянутый в этом руководстве, можно найти на GitHub .