1. Введение
В этом руководстве мы создадим производителя и потребителя сообщений, используя Spring Boot и Apache RocketMQ, платформу распределенного обмена сообщениями и потоковой передачи данных с открытым исходным кодом.
2. Зависимости
Для проектов Maven нам нужно добавить зависимость RocketMQ Spring Boot Starter :
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
3. Создание сообщений
В нашем примере мы создадим базовый генератор сообщений, который будет отправлять события всякий раз, когда пользователь добавляет или удаляет товар из корзины.
Во-первых, давайте настроим расположение нашего сервера и имя группы в нашем application.properties
:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group
Обратите внимание, что если бы у нас было более одного сервера имен, мы могли бы перечислить их как host:port;host:port
.
Теперь для простоты мы создадим приложение CommandLineRunner
и сгенерируем несколько событий во время запуска приложения:
@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(CartEventProducer.class, args);
}
public void run(String... args) throws Exception {
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
}
}
CartItemEvent состоит всего
из двух свойств — идентификатора товара и количества:
class CartItemEvent {
private String itemId;
private int quantity;
// constructor, getters and setters
}
В приведенном выше примере мы используем метод convertAndSend()
, универсальный метод, определенный абстрактным классом AbstractMessageSendingTemplate
, для отправки событий корзины. Он принимает два параметра: пункт назначения, которым в нашем случае является название темы, и полезную нагрузку сообщения.
4. Потребитель сообщений
Использование сообщений RocketMQ так же просто, как создание компонента Spring с аннотацией @RocketMQMessageListener
и реализация интерфейса RocketMQListener
:
@SpringBootApplication
public class CartEventConsumer {
public static void main(String[] args) {
SpringApplication.run(CartEventConsumer.class, args);
}
@Service
@RocketMQMessageListener(
topic = "cart-item-add-topic",
consumerGroup = "cart-consumer_cart-item-add-topic"
)
public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent addItemEvent) {
log.info("Adding item: {}", addItemEvent);
// additional logic
}
}
@Service
@RocketMQMessageListener(
topic = "cart-item-removed-topic",
consumerGroup = "cart-consumer_cart-item-removed-topic"
)
public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent removeItemEvent) {
log.info("Removing item: {}", removeItemEvent);
// additional logic
}
}
}
Нам нужно создать отдельный компонент для каждой темы сообщения, которую мы слушаем. В каждом из этих слушателей мы определяем имя темы и имя группы потребителей через аннотацию @
RocketMQMessageListener
.
5. Синхронная и асинхронная передача
В предыдущих примерах мы использовали метод convertAndSend
для отправки наших сообщений. Однако у нас есть и другие варианты.
Мы могли бы, например, вызвать syncSend
, который отличается от convertAndSend,
потому что он возвращает объект SendResult .
Его можно использовать, например, для проверки успешности отправки нашего сообщения или получения его идентификатора:
public void run(String... args) throws Exception {
SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("bike", 1));
SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("computer", 2));
SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic",
new CartItemEvent("bike", 1));
}
Как и convertAndSend,
этот метод возвращается только после завершения процедуры отправки.
Мы должны использовать синхронную передачу в случаях, требующих высокой надежности, таких как важные уведомления или SMS-уведомления.
С другой стороны, вместо этого мы можем захотеть отправить сообщение асинхронно и получить уведомление, когда отправка завершится.
Мы можем сделать это с помощью asyncSend
, который принимает SendCallback
в качестве параметра и немедленно возвращает результат:
rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.error("Successfully sent cart item");
}
@Override
public void onException(Throwable throwable) {
log.error("Exception during cart item sending", throwable);
}
});
Мы используем асинхронную передачу в случаях, когда требуется высокая пропускная способность.
Наконец, для сценариев с очень высокими требованиями к пропускной способности мы можем использовать sendOneWay
вместо asyncSend
. sendOneWay
отличается от asyncSend
тем, что не гарантирует отправку сообщения.
Одностороннюю передачу также можно использовать для обычных случаев надежности, таких как сбор журналов.
6. Отправка сообщений в транзакции ** **
RocketMQ предоставляет нам возможность отправлять сообщения внутри транзакции. Мы можем сделать это с помощью метода sendInTransaction()
:
MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);
Также мы должны реализовать интерфейс RocketMQLocalTransactionListener
:
@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}
}
В sendMessageInTransaction()
первым параметром является имя транзакции. Оно должно совпадать с полем участника @RocketMQTransactionListener
txProducerGroup.
7. Конфигурация поставщика сообщений
Мы также можем настроить аспекты самого производителя сообщений:
Rocketmq.producer.send-message-timeout
: время ожидания отправки сообщения в миллисекундах — значение по умолчанию — 3000.Rocketmq.producer.compress-message-body-threshold
: Порог, выше которого RocketMQ будет сжимать сообщения — значение по умолчанию — 1024.Rocketmq.producer.max-message-size
: максимальный размер сообщения в байтах — значение по умолчанию — 4096.Rocketmq.producer.retry-times-when-send-async-failed
: максимальное количество внутренних попыток в асинхронном режиме перед отправкой ошибки — значение по умолчанию равно 2.Rocketmq.producer.retry-next-server
: указывает, следует ли повторить попытку другого брокера при внутренней отправке сообщения об ошибке — значение по умолчанию —false
.Rocketmq.producer.retry-times-when-send-failed
: максимальное количество внутренних попыток в асинхронном режиме перед ошибкой отправки — значение по умолчанию — 2.
8. Заключение
В этой статье мы узнали, как отправлять и использовать сообщения с помощью Apache RocketMQ и Spring Boot. Как всегда весь исходный код доступен на GitHub .