Перейти к основному содержимому

Введение в MBassador

· 10 мин. чтения

1. Обзор

Проще говоря, MBassadorэто высокопроизводительная шина событий, использующая семантику публикации-подписки .

Сообщения передаются одному или нескольким одноранговым узлам без предварительного знания о количестве подписчиков или о том, как они используют сообщение.

2. Зависимость от Maven

Прежде чем мы сможем использовать библиотеку, нам нужно добавить зависимость mbassador :

<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.1</version>
</dependency>

3. Базовая обработка событий

3.1. Простой пример

Начнем с простого примера публикации сообщения:

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
dispatcher.post("TestString").now();

assertNotNull(messageString);
assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
messageString = message;
}

В верхней части этого тестового класса мы видим создание MBassador с его конструктором по умолчанию. Затем в методе @Before мы вызываем subscribe() и передаем ссылку на сам класс.

В subscribe() диспетчер проверяет подписчика на наличие аннотаций @Handler .

И в первом тесте мы вызываем dispatcher.post(…).now() для отправки сообщения, что приводит к вызову handleString() .

Этот первоначальный тест демонстрирует несколько важных концепций. Любой объект может быть подписчиком, если у него есть один или несколько методов, аннотированных с помощью @Handler . Подписчик может иметь любое количество обработчиков.

Мы используем тестовые объекты, которые подписываются сами на себя для простоты, но в большинстве производственных сценариев диспетчеры сообщений относятся к классам, отличным от классов потребителей.

Методы-обработчики имеют только один входной параметр — сообщение и не могут генерировать проверенные исключения.

Подобно методу subscribe() , метод post принимает любой Object . Этот объект доставляется подписчикам.

Когда сообщение публикуется, оно доставляется всем слушателям, которые подписались на тип сообщения.

Давайте добавим еще один обработчик сообщений и отправим сообщение другого типа:

private Integer messageInteger; 

@Test
public void whenIntegerDispatched_thenHandleInteger() {
dispatcher.post(42).now();

assertNull(messageString);
assertNotNull(messageInteger);
assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
messageInteger = message;
}

Как и ожидалось, когда мы отправляем Integer , `вызывается handleInteger()` , а `handleString()` — нет. Один диспетчер может использоваться для отправки более одного типа сообщений.

3.2. Мертвые сообщения

Итак, куда девается сообщение, если для него нет обработчика? Давайте добавим новый обработчик событий, а затем отправим сообщение третьего типа:

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
dispatcher.post(42L).now();

assertNull(messageString);
assertNull(messageInteger);
assertNotNull(deadEvent);
assertTrue(deadEvent instanceof Long);
assertTrue(42L == (Long) deadEvent);
}

@Handler
public void handleDeadEvent(DeadMessage message) {
deadEvent = message.getMessage();
}

В этом тесте мы отправляем Long вместо Integer. Ни handleInteger() , ни handleString() не вызываются, но вызывается handleDeadEvent() .

Когда для сообщения нет обработчиков, оно помещается в объект DeadMessage . Так как мы добавили обработчик для Deadmessage , мы его захватываем.

DeadMessage можно спокойно игнорировать; если приложению не нужно отслеживать мертвые сообщения, их можно разрешить никуда не девать.

4. Использование иерархии событий

Отправка событий String и Integer ограничена. Давайте создадим несколько классов сообщений:

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
int code;

// setters and getters
}

У нас есть простой базовый класс и два класса, которые его расширяют.

4.1. Отправка сообщения базового класса ``

Начнем с событий Message :

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
dispatcher.post(new Message()).now();
assertNotNull(message);
assertNull(ackMessage);
assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
ackMessage = message;
}

Откройте для себя MBassador — высокопроизводительный автобус для проведения мероприятий. Это ограничивает нас использованием сообщений , но добавляет дополнительный уровень безопасности типов.

Когда мы отправляем сообщение , handleMessage() получает его. Два других обработчика этого не делают.

4.2. Отправка сообщения подкласса

Давайте отправим RejectMessage :

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
dispatcher.post(new RejectMessage()).now();

assertNotNull(message);
assertNotNull(rejectMessage);
assertNull(ackMessage);
}

Когда мы отправляем сообщение RejectMessage , его получают и handleRejectMessage(), и handleMessage() .

Поскольку RejectMessage расширяет Message, обработчик Message получил его в дополнение к обработчику RejectMessage . ``

Давайте проверим это поведение с помощью AckMessage :

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
dispatcher.post(new AckMessage()).now();

assertNotNull(message);
assertNotNull(ackMessage);
assertNull(rejectMessage);
}

Как мы и ожидали, когда мы отправляем AckMessage , его получают и handleAckMessage() , и handleMessage() .

5. Фильтрация сообщений

Организация сообщений по типу уже является мощной функцией, но мы можем еще больше фильтровать их.

5.1. Фильтровать по классу и подклассу

Когда мы отправляли RejectMessage или AckMessage , мы получали событие как в обработчике событий для определенного типа, так и в базовом классе.

Мы можем решить эту проблему иерархии типов, сделав Message абстрактным и создав класс, такой как GenericMessage . Но что, если у нас нет такой роскоши?

Мы можем использовать фильтры сообщений:

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
dispatcher.post(new Message()).now();

assertNotNull(baseMessage);
assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
dispatcher.post(new RejectMessage()).now();

assertNotNull(subMessage);
assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
this.subMessage = message;
}

Параметр фильтров для аннотации @Handler принимает класс , который реализует IMessageFilter . Библиотека предлагает два примера:

Filters.RejectSubtypes делает то , что следует из его названия: он отфильтровывает любые подтипы. В этом случае мы видим, что RejectMessage не обрабатывается handleBaseMessage().

Filters.SubtypesOnly также делает то , что следует из его названия: он отфильтровывает любые базовые типы. В этом случае мы видим, что Message не обрабатывается функцией handleSubMessage().

5.2. IMessageFilter

Оба фильтра Filters.RejectSubtypes и Filters.SubtypesOnly реализуют IMessageFilter .

RejectSubTypes сравнивает класс сообщения с его определенными типами сообщений и разрешает только сообщения, соответствующие одному из его типов, в отличие от любых подклассов.

5.3. Фильтр с условиями

К счастью, есть более простой способ фильтрации сообщений. MBassador поддерживает подмножество выражений Java EL в качестве условий для фильтрации сообщений.

Давайте отфильтруем сообщение String на основе его длины:

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();

assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}

«Фубар!» сообщение состоит из семи символов и фильтруется. Давайте отправим более короткую строку :

@Test
public void whenShortStringDispatched_thenStringHandled() {
dispatcher.post("foobar").now();

assertNotNull(testString);
}

Теперь «foobar» состоит всего из шести символов и пропускается.

Наш RejectMessage содержит поле с аксессором. Напишем для этого фильтр:

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

RejectMessage testReject = new RejectMessage();
testReject.setCode(-1);

dispatcher.post(testReject).now();

assertNull(rejectMessage);
assertNotNull(subMessage);
assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
this.rejectMessage = rejectMessage;
}

Здесь снова мы можем запросить метод объекта и либо отфильтровать сообщение, либо нет.

5.4. Захват отфильтрованных сообщений

Как и в случае с DeadEvents, мы можем захотеть захватить и обработать отфильтрованные сообщения. Также существует специальный механизм для захвата отфильтрованных событий. Отфильтрованные события обрабатываются иначе, чем «мертвые» события.

Давайте напишем тест, иллюстрирующий это:

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();

assertNull(testString);
assertNotNull(filteredMessage);
assertTrue(filteredMessage.getMessage() instanceof String);
assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
this.deadMessage = deadMessage;
}

С добавлением обработчика FilteredMessage мы можем отслеживать строки , отфильтрованные из-за их длины. filterMessage содержит нашу слишком длинную строку , в то время как deadMessage остается нулевым.

6. Асинхронная отправка и обработка сообщений

До сих пор во всех наших примерах использовалась синхронная отправка сообщений; когда мы вызывали post.now() , сообщения доставлялись каждому обработчику в том же потоке, из которого мы вызывали post() .

6.1. Асинхронная отправка

MBassador.post () возвращает SyncAsyncPostCommand . Этот класс предлагает несколько методов, в том числе:

  • now() — отправлять сообщения синхронно; вызов будет заблокирован до тех пор, пока все сообщения не будут доставлены
  • asynchronously() — выполняет публикацию сообщения асинхронно

Давайте используем асинхронную отправку в примере класса. Мы будем использовать Awaitility в этих тестах, чтобы упростить код:

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
dispatcher.post("foobar").asynchronously();

await().untilAtomic(ready, equalTo(true));
assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
this.testString = message;
ready.set(true);
}

В этом тесте мы вызываем asynchronously() и используем AtomicBoolean в качестве флага с await() для ожидания доставки сообщения потоком доставки.

Если мы закомментируем вызов await() , мы рискуем провалить тест, потому что мы проверяем testString до завершения потока доставки.

6.2. Вызов асинхронного обработчика

Асинхронная отправка позволяет поставщику сообщений вернуться к обработке сообщений до того, как сообщения будут доставлены каждому обработчику, но он по-прежнему вызывает каждый обработчик по порядку, и каждый обработчик должен ждать завершения предыдущего.

Это может привести к проблемам, если один обработчик выполняет дорогостоящую операцию.

MBassador предоставляет механизм асинхронного вызова обработчика. Обработчики, настроенные для этого, получают сообщения в своем потоке:

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
dispatcher.post(42).now();

await().untilAtomic(ready, equalTo(true));
assertNotNull(testInteger);
assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {

this.invocationThreadName = Thread.currentThread().getName();
this.testInteger = message;
ready.set(true);
}

Обработчики могут запрашивать асинхронный вызов с помощью свойства delivery = Invoke.Asynchronously в аннотации Handler . Мы проверяем это в нашем тесте, сравнивая имена потоков в методе диспетчеризации и обработчике.

7. Настройка MBassador

До сих пор мы использовали экземпляр MBassador с конфигурацией по умолчанию. Поведение диспетчера можно изменить с помощью аннотаций, аналогичных тем, которые мы видели до сих пор; мы рассмотрим еще несколько, чтобы закончить этот урок.

7.1. Обработка исключений

Обработчики не могут определять проверенные исключения. Вместо этого диспетчеру можно предоставить IPublicationErrorHandler в качестве аргумента его конструктору:

public class MBassadorConfigurationTest
implements IPublicationErrorHandler {

private MBassador dispatcher;
private String messageString;
private Throwable errorCause;

@Before
public void prepareTests() {
dispatcher = new MBassador<String>(this);
dispatcher.subscribe(this);
}

@Test
public void whenErrorOccurs_thenErrorHandler() {
dispatcher.post("Error").now();

assertNull(messageString);
assertNotNull(errorCause);
}

@Test
public void whenNoErrorOccurs_thenStringHandler() {
dispatcher.post("Error").now();

assertNull(errorCause);
assertNotNull(messageString);
}

@Handler
public void handleString(String message) {
if ("Error".equals(message)) {
throw new Error("BOOM");
}
messageString = message;
}

@Override
public void handleError(PublicationError error) {
errorCause = error.getCause().getCause();
}
}

Когда handleString() выдает ошибку, она сохраняется в errorCause.

7.2. Приоритет обработчика

Обработчики вызываются в порядке, обратном их добавлению, но это не то поведение, на которое мы хотим полагаться. Даже имея возможность вызывать обработчики в своих потоках, нам может понадобиться знать, в каком порядке они будут вызываться.

Мы можем установить приоритет обработчика явно:

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
dispatcher.post(new RejectMessage()).now();

// Items should pop() off in reverse priority order
assertTrue(1 == list.pop());
assertTrue(3 == list.pop());
assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage)
logger.error("Reject handler #3");
list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
list.push(1);
}

Обработчики вызываются от наивысшего приоритета к низшему. Обработчики с приоритетом по умолчанию, равным нулю, вызываются последними. Мы видим, что номера обработчиков pop() отключаются в обратном порядке.

7.3. Отказ от подтипов, простой способ

Что случилось с handleMessage() в приведенном выше тесте? Нам не нужно использовать RejectSubTypes.class для фильтрации наших подтипов.

RejectSubTypes — это логический флаг, который обеспечивает ту же фильтрацию, что и класс, но с более высокой производительностью, чем реализация IMessageFilter .

Однако нам по-прежнему нужно использовать реализацию на основе фильтров только для приема подтипов.

8. Заключение

MBassador — это простая и понятная библиотека для передачи сообщений между объектами. Сообщения могут быть организованы различными способами и могут быть отправлены синхронно или асинхронно.

И, как всегда, пример доступен в этом проекте GitHub .