Перейти к основному содержимому

Введение в Spring Cloud Stream

· 8 мин. чтения

1. Обзор

Spring Cloud Stream — это платформа, созданная на основе Spring Boot и Spring Integration, которая помогает создавать микросервисы, управляемые событиями или сообщениями .

В этой статье мы познакомим вас с концепциями и конструкциями Spring Cloud Stream на нескольких простых примерах.

2. Зависимости Maven

Для начала нам нужно добавить Spring Cloud Starter Stream с зависимостью брокера RabbitMQ Maven в качестве промежуточного программного обеспечения для обмена сообщениями в наш pom.xml :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.1.3</version>
</dependency>

И мы добавим зависимость модуля от Maven Central , чтобы также включить поддержку JUnit:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>3.1.3</version>
<scope>test</scope>
</dependency>

3. Основные понятия

Архитектура микросервисов следует принципу « умные конечные точки и тупые каналы ». Связь между конечными точками управляется сторонами промежуточного программного обеспечения для обмена сообщениями, такими как RabbitMQ или Apache Kafka. Службы обмениваются данными, публикуя события предметной области через эти конечные точки или каналы .

Давайте рассмотрим концепции, из которых состоит среда Spring Cloud Stream, а также основные парадигмы, которые мы должны знать для создания сервисов, управляемых сообщениями.

3.1. Конструкты

Давайте рассмотрим простой сервис в Spring Cloud Stream, который прослушивает входную привязку и отправляет ответ на выходную привязку:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}

Аннотация @EnableBinding настраивает приложение для привязки каналов INPUT и OUTPUT , определенных в интерфейсе Processor . Оба канала являются привязками, которые можно настроить для использования конкретного промежуточного программного обеспечения или связывателя сообщений.

Давайте посмотрим на определение всех этих понятий:

  • Bindings — набор интерфейсов, декларативно идентифицирующих входные и выходные каналы.
  • Binder — реализация промежуточного программного обеспечения для обмена сообщениями, такого как Kafka или RabbitMQ.
  • Канал — представляет канал связи между промежуточным программным обеспечением для обмена сообщениями и приложением.
  • StreamListeners — методы обработки сообщений в bean-компонентах, которые будут автоматически вызываться для сообщения из канала после того, как MessageConverter выполнит сериализацию/десериализацию между событиями промежуточного программного обеспечения и типами объектов домена/POJO.
  • Схемы сообщений — используются для сериализации и десериализации сообщений, эти схемы могут быть статически считаны из местоположения или загружены динамически, поддерживая эволюцию типов объектов предметной области.

3.2. Шаблоны общения

Сообщения, назначенные адресатам, доставляются по шаблону обмена сообщениями Publish-Subscribe . Издатели классифицируют сообщения по темам, каждая из которых идентифицируется по имени. Подписчики проявляют интерес к одной или нескольким темам. Промежуточное ПО фильтрует сообщения, доставляя подписчикам интересующие темы.

Теперь подписчиков можно было сгруппировать. Группа потребителей — это набор подписчиков или потребителей, идентифицируемых по идентификатору группы , в рамках которого сообщения из темы или раздела темы доставляются с балансировкой нагрузки.

4. Модель программирования

В этом разделе описываются основы создания приложений Spring Cloud Stream.

4.1. Функциональное тестирование

Тестовая поддержка — это реализация связующего, которая позволяет взаимодействовать с каналами и проверять сообщения.

Давайте отправим сообщение в указанный выше сервис richLogMessage и проверим, содержит ли ответ текст «[1]:» в начале сообщения:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

@Autowired
private Processor pipe;

@Autowired
private MessageCollector messageCollector;

@Test
public void whenSendMessage_thenResponseShouldUpdateText() {
pipe.input()
.send(MessageBuilder.withPayload(new LogMessage("This is my message"))
.build());

Object payload = messageCollector.forChannel(pipe.output())
.poll()
.getPayload();

assertEquals("[1]: This is my message", payload.toString());
}
}

4.2. Пользовательские каналы

В приведенном выше примере мы использовали интерфейс Processor , предоставляемый Spring Cloud, который имеет только один входной и один выходной канал.

Если нам нужно что-то другое, например, один входной и два выходных канала, мы можем создать собственный процессор:

public interface MyProcessor {
String INPUT = "myInput";

@Input
SubscribableChannel myInput();

@Output("myOutput")
MessageChannel anOutput();

@Output
MessageChannel anotherOutput();
}

Spring предоставит нам правильную реализацию этого интерфейса. Имена каналов можно задать с помощью аннотаций, таких как @Output(“myOutput”) .

В противном случае Spring будет использовать имена методов в качестве имен каналов. Таким образом, у нас есть три канала с именами myInput , myOutput и AnotherOutput .

Теперь давайте представим, что мы хотим направить сообщения на один выход, если значение меньше 10, и на другой выход, если значение больше или равно 10:

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
if (val < 10) {
processor.anOutput().send(message(val));
} else {
processor.anotherOutput().send(message(val));
}
}

private static final <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}

4.3. Условная отправка

Используя аннотацию @StreamListener , мы также можем фильтровать сообщения, которые мы ожидаем от потребителя , используя любое условие, которое мы определяем с помощью выражений SpEL .

Например, мы могли бы использовать условную диспетчеризацию как еще один подход к маршрутизации сообщений в разные выходные потоки:

@Autowired
private MyProcessor processor;

@StreamListener(
target = MyProcessor.INPUT,
condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
processor.anOutput().send(message(val));
}

@StreamListener(
target = MyProcessor.INPUT,
condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
processor.anotherOutput().send(message(val));
}

Единственное ограничение этого подхода заключается в том, что эти методы не должны возвращать значение.

5. Настройка

Настроим приложение, которое будет обрабатывать сообщение от брокера RabbitMQ.

5.1. Конфигурация связующего

Мы можем настроить наше приложение для использования реализации связывателя по умолчанию через META-INF/spring.binders :

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Или мы можем добавить библиотеку связывателя для RabbitMQ в путь к классам, включив эту зависимость :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>

Если реализация связывателя не предоставлена, Spring будет использовать прямую связь между каналами.

5.2. Конфигурация RabbitMQ

Чтобы настроить пример в разделе 3.1 для использования биндера RabbitMQ, нам нужно обновить application.yml, расположенный в src/main/resources :

spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: 5672
username: <username>
password: <password>
virtual-host: /

Входная привязка будет использовать обмен с именем queue.log.messages , а выходная привязка будет использовать обмен queue.pretty.log.messages . Обе привязки будут использовать привязку с именем local_rabbit .

Обратите внимание, что нам не нужно заранее создавать обмены или очереди RabbitMQ. При запуске приложения автоматически создаются обе биржи .

Чтобы протестировать приложение, мы можем использовать сайт управления RabbitMQ для публикации сообщения. В панели Publish Message биржи queue.log.messages нам нужно ввести запрос в формате JSON.

5.3. Настройка преобразования сообщений

Spring Cloud Stream позволяет нам применять преобразование сообщений для определенных типов контента. В приведенном выше примере вместо использования формата JSON мы хотим предоставить обычный текст.

Для этого мы применим пользовательское преобразование к LogMessage с помощью MessageConverter :

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
//...

@Bean
public MessageConverter providesTextPlainMessageConverter() {
return new TextPlainMessageConverter();
}

//...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

public TextPlainMessageConverter() {
super(new MimeType("text", "plain"));
}

@Override
protected boolean supports(Class<?> clazz) {
return (LogMessage.class == clazz);
}

@Override
protected Object convertFromInternal(Message<?> message,
Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String text = payload instanceof String
? (String) payload
: new String((byte[]) payload);
return new LogMessage(text);
}
}

После применения этих изменений, вернувшись к панели « Публикация сообщения », если мы установим для заголовка « contentTypes » значение « text/plain », а для полезной нагрузки — « Hello World », все должно работать, как и раньше.

5.4. Группы потребителей

При запуске нескольких экземпляров нашего приложения каждый раз, когда во входном канале появляется новое сообщение, все подписчики будут уведомлены .

В большинстве случаев нам нужно, чтобы сообщение обрабатывалось только один раз. Spring Cloud Stream реализует это поведение через группы потребителей.

Чтобы включить это поведение, каждая привязка потребителя может использовать свойство spring.cloud.stream.bindings.<CHANNEL>.group для указания имени группы:

spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
group: logMessageConsumers
...

6. Микросервисы, управляемые сообщениями

В этом разделе мы представляем все необходимые функции для запуска наших приложений Spring Cloud Stream в контексте микросервисов.

6.1. Масштабирование

Когда запущено несколько приложений, важно обеспечить правильное разделение данных между потребителями. Для этого Spring Cloud Stream предоставляет два свойства:

  • spring.cloud.stream.instanceCount — количество запущенных приложений
  • spring.cloud.stream.instanceIndex — индекс текущего приложения

Например, если мы развернули два экземпляра вышеуказанного приложения MyLoggerServiceApplication , свойство spring.cloud.stream.instanceCount должно быть равно 2 для обоих приложений, а свойство spring.cloud.stream.instanceIndex должно быть равно 0 и 1 соответственно.

Эти свойства устанавливаются автоматически, если мы развертываем приложения Spring Cloud Stream с помощью Spring Data Flow, как описано в этой статье .

6.2. Разделение

События домена могут быть разделенными сообщениями. Это помогает при увеличении объема хранилища и повышении производительности приложений .

Событие домена обычно имеет ключ раздела, поэтому оно оказывается в том же разделе со связанными сообщениями.

Допустим, мы хотим, чтобы сообщения журнала были разделены по первой букве в сообщении, которая будет ключом раздела, и сгруппированы в два раздела.

Будет один раздел для сообщений журнала, начинающихся с AM , и другой раздел для NZ. Это можно настроить с помощью двух свойств:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — выражение для разделения полезной нагрузки
  • spring.cloud.stream.bindings.output.producer.partitionCount — количество групп

Иногда выражение для разделения слишком сложное, чтобы записать его в одну строку. Для этих случаев мы можем написать собственную стратегию разделения, используя свойство spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass .

6.3. Индикатор здоровья

В контексте микрослужб нам также необходимо определить, когда служба не работает или начинает давать сбой . Spring Cloud Stream предоставляет свойство management.health.binders.enabled , чтобы включить индикаторы работоспособности для связывателей.

При запуске приложения мы можем запросить состояние работоспособности по адресу http://<host>:<port>/health .

7. Заключение

В этом руководстве мы представили основные концепции Spring Cloud Stream и показали, как использовать его на нескольких простых примерах с RabbitMQ. Подробнее о Spring Cloud Stream можно узнать здесь .

Исходный код этой статьи можно найти на GitHub .