Перейти к основному содержимому

Apache RocketMQ с Spring Boot

· 5 мин. чтения

1. Введение

В этом руководстве мы создадим производителя и потребителя сообщений, используя Spring Boot и Apache RocketMQ, платформу распределенного обмена сообщениями и потоковой передачи данных с открытым исходным кодом.

2. Зависимости

Для проектов Maven нам нужно добавить зависимость RocketMQ Spring Boot Starter :

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

3. Создание сообщений

В нашем примере мы создадим базовый генератор сообщений, который будет отправлять события всякий раз, когда пользователь добавляет или удаляет товар из корзины.

Во-первых, давайте настроим расположение нашего сервера и имя группы в нашем application.properties :

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

Обратите внимание, что если бы у нас было более одного сервера имен, мы могли бы перечислить их как host:port;host:port .

Теперь для простоты мы создадим приложение CommandLineRunner и сгенерируем несколько событий во время запуска приложения:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public static void main(String[] args) {
SpringApplication.run(CartEventProducer.class, args);
}

public void run(String... args) throws Exception {
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
}
}

CartItemEvent состоит всего из двух свойств — идентификатора товара и количества:

class CartItemEvent {
private String itemId;
private int quantity;

// constructor, getters and setters
}

В приведенном выше примере мы используем метод convertAndSend() , универсальный метод, определенный абстрактным классом AbstractMessageSendingTemplate , для отправки событий корзины. Он принимает два параметра: пункт назначения, которым в нашем случае является название темы, и полезную нагрузку сообщения.

4. Потребитель сообщений

Использование сообщений RocketMQ так же просто, как создание компонента Spring с аннотацией @RocketMQMessageListener и реализация интерфейса RocketMQListener :

@SpringBootApplication
public class CartEventConsumer {

public static void main(String[] args) {
SpringApplication.run(CartEventConsumer.class, args);
}

@Service
@RocketMQMessageListener(
topic = "cart-item-add-topic",
consumerGroup = "cart-consumer_cart-item-add-topic"
)
public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent addItemEvent) {
log.info("Adding item: {}", addItemEvent);
// additional logic
}
}

@Service
@RocketMQMessageListener(
topic = "cart-item-removed-topic",
consumerGroup = "cart-consumer_cart-item-removed-topic"
)
public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent removeItemEvent) {
log.info("Removing item: {}", removeItemEvent);
// additional logic
}
}
}

Нам нужно создать отдельный компонент для каждой темы сообщения, которую мы слушаем. В каждом из этих слушателей мы определяем имя темы и имя группы потребителей через аннотацию @ RocketMQMessageListener .

5. Синхронная и асинхронная передача

В предыдущих примерах мы использовали метод convertAndSend для отправки наших сообщений. Однако у нас есть и другие варианты.

Мы могли бы, например, вызвать syncSend , который отличается от convertAndSend, потому что он возвращает объект SendResult .

Его можно использовать, например, для проверки успешности отправки нашего сообщения или получения его идентификатора:

public void run(String... args) throws Exception { 
SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("bike", 1));
SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("computer", 2));
SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic",
new CartItemEvent("bike", 1));
}

Как и convertAndSend, этот метод возвращается только после завершения процедуры отправки.

Мы должны использовать синхронную передачу в случаях, требующих высокой надежности, таких как важные уведомления или SMS-уведомления.

С другой стороны, вместо этого мы можем захотеть отправить сообщение асинхронно и получить уведомление, когда отправка завершится.

Мы можем сделать это с помощью asyncSend , который принимает SendCallback в качестве параметра и немедленно возвращает результат:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.error("Successfully sent cart item");
}

@Override
public void onException(Throwable throwable) {
log.error("Exception during cart item sending", throwable);
}
});

Мы используем асинхронную передачу в случаях, когда требуется высокая пропускная способность.

Наконец, для сценариев с очень высокими требованиями к пропускной способности мы можем использовать sendOneWay вместо asyncSend . sendOneWay отличается от asyncSend тем, что не гарантирует отправку сообщения.

Одностороннюю передачу также можно использовать для обычных случаев надежности, таких как сбор журналов.

6. Отправка сообщений в транзакции ** **

RocketMQ предоставляет нам возможность отправлять сообщения внутри транзакции. Мы можем сделать это с помощью метода sendInTransaction() :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Также мы должны реализовать интерфейс RocketMQLocalTransactionListener :

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}
}

В sendMessageInTransaction() первым параметром является имя транзакции. Оно должно совпадать с полем участника @RocketMQTransactionListener txProducerGroup.

7. Конфигурация поставщика сообщений

Мы также можем настроить аспекты самого производителя сообщений:

  • Rocketmq.producer.send-message-timeout : время ожидания отправки сообщения в миллисекундах — значение по умолчанию — 3000.
  • Rocketmq.producer.compress-message-body-threshold : Порог, выше которого RocketMQ будет сжимать сообщения — значение по умолчанию — 1024.
  • Rocketmq.producer.max-message-size : максимальный размер сообщения в байтах — значение по умолчанию — 4096.
  • Rocketmq.producer.retry-times-when-send-async-failed : максимальное количество внутренних попыток в асинхронном режиме перед отправкой ошибки — значение по умолчанию равно 2.
  • Rocketmq.producer.retry-next-server : указывает, следует ли повторить попытку другого брокера при внутренней отправке сообщения об ошибке — значение по умолчанию — false .
  • Rocketmq.producer.retry-times-when-send-failed : максимальное количество внутренних попыток в асинхронном режиме перед ошибкой отправки — значение по умолчанию — 2.

8. Заключение

В этой статье мы узнали, как отправлять и использовать сообщения с помощью Apache RocketMQ и Spring Boot. Как всегда весь исходный код доступен на GitHub .