1. Обзор
Проще говоря, MBassador — это высокопроизводительная шина событий, использующая семантику публикации-подписки .
Сообщения передаются одному или нескольким одноранговым узлам без предварительного знания о количестве подписчиков или о том, как они используют сообщение.
2. Зависимость от Maven
Прежде чем мы сможем использовать библиотеку, нам нужно добавить зависимость mbassador :
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.1</version>
</dependency>
3. Базовая обработка событий
3.1. Простой пример
Начнем с простого примера публикации сообщения:
private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenStringDispatched_thenHandleString() {
dispatcher.post("TestString").now();
assertNotNull(messageString);
assertEquals("TestString", messageString);
}
@Handler
public void handleString(String message) {
messageString = message;
}
В верхней части этого тестового класса мы видим создание MBassador
с его конструктором по умолчанию. Затем в методе @Before
мы вызываем subscribe()
и передаем ссылку на сам класс.
В subscribe()
диспетчер проверяет подписчика на наличие аннотаций @Handler
.
И в первом тесте мы вызываем dispatcher.post(…).now()
для отправки сообщения, что приводит к вызову handleString()
.
Этот первоначальный тест демонстрирует несколько важных концепций. Любой объект
может быть подписчиком, если у него есть один или несколько методов, аннотированных с помощью @Handler
. Подписчик может иметь любое количество обработчиков.
Мы используем тестовые объекты, которые подписываются сами на себя для простоты, но в большинстве производственных сценариев диспетчеры сообщений относятся к классам, отличным от классов потребителей.
Методы-обработчики имеют только один входной параметр — сообщение и не могут генерировать проверенные исключения.
Подобно методу subscribe()
, метод post принимает любой Object
. Этот объект
доставляется подписчикам.
Когда сообщение публикуется, оно доставляется всем слушателям, которые подписались на тип сообщения.
Давайте добавим еще один обработчик сообщений и отправим сообщение другого типа:
private Integer messageInteger;
@Test
public void whenIntegerDispatched_thenHandleInteger() {
dispatcher.post(42).now();
assertNull(messageString);
assertNotNull(messageInteger);
assertTrue(42 == messageInteger);
}
@Handler
public void handleInteger(Integer message) {
messageInteger = message;
}
Как и ожидалось, когда мы отправляем Integer
, `вызывается handleInteger()` , а `handleString()` — нет. Один диспетчер может использоваться для отправки более одного типа сообщений.
3.2. Мертвые сообщения
Итак, куда девается сообщение, если для него нет обработчика? Давайте добавим новый обработчик событий, а затем отправим сообщение третьего типа:
private Object deadEvent;
@Test
public void whenLongDispatched_thenDeadEvent() {
dispatcher.post(42L).now();
assertNull(messageString);
assertNull(messageInteger);
assertNotNull(deadEvent);
assertTrue(deadEvent instanceof Long);
assertTrue(42L == (Long) deadEvent);
}
@Handler
public void handleDeadEvent(DeadMessage message) {
deadEvent = message.getMessage();
}
В этом тесте мы отправляем Long
вместо Integer.
Ни handleInteger()
, ни handleString()
не вызываются, но вызывается handleDeadEvent()
.
Когда для сообщения нет обработчиков, оно помещается в объект DeadMessage
. Так как мы добавили обработчик для Deadmessage
, мы его захватываем.
DeadMessage
можно спокойно игнорировать; если приложению не нужно отслеживать мертвые сообщения, их можно разрешить никуда не девать.
4. Использование иерархии событий
Отправка событий String
и Integer
ограничена. Давайте создадим несколько классов сообщений:
public class Message {}
public class AckMessage extends Message {}
public class RejectMessage extends Message {
int code;
// setters and getters
}
У нас есть простой базовый класс и два класса, которые его расширяют.
4.1. Отправка сообщения
базового класса ``
Начнем с событий Message :
private MBassador<Message> dispatcher = new MBassador<>();
private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenMessageDispatched_thenMessageHandled() {
dispatcher.post(new Message()).now();
assertNotNull(message);
assertNull(ackMessage);
assertNull(rejectMessage);
}
@Handler
public void handleMessage(Message message) {
this.message = message;
}
@Handler
public void handleRejectMessage(RejectMessage message) {
rejectMessage = message;
}
@Handler
public void handleAckMessage(AckMessage message) {
ackMessage = message;
}
Откройте для себя MBassador — высокопроизводительный автобус для проведения мероприятий. Это ограничивает нас использованием сообщений
, но добавляет дополнительный уровень безопасности типов.
Когда мы отправляем сообщение
, handleMessage()
получает его. Два других обработчика этого не делают.
4.2. Отправка сообщения подкласса
Давайте отправим RejectMessage
:
@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(message);
assertNotNull(rejectMessage);
assertNull(ackMessage);
}
Когда мы отправляем сообщение RejectMessage
, его получают и handleRejectMessage(),
и handleMessage()
.
Поскольку RejectMessage
расширяет Message,
обработчик Message
получил его в дополнение к обработчику
RejectMessage
. ``
Давайте проверим это поведение с помощью AckMessage
:
@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
dispatcher.post(new AckMessage()).now();
assertNotNull(message);
assertNotNull(ackMessage);
assertNull(rejectMessage);
}
Как мы и ожидали, когда мы отправляем AckMessage
, его получают и handleAckMessage()
, и handleMessage()
.
5. Фильтрация сообщений
Организация сообщений по типу уже является мощной функцией, но мы можем еще больше фильтровать их.
5.1. Фильтровать по классу и подклассу
Когда мы отправляли RejectMessage
или AckMessage
, мы получали событие как в обработчике событий для определенного типа, так и в базовом классе.
Мы можем решить эту проблему иерархии типов, сделав Message
абстрактным и создав класс, такой как GenericMessage
. Но что, если у нас нет такой роскоши?
Мы можем использовать фильтры сообщений:
private Message baseMessage;
private Message subMessage;
@Test
public void whenMessageDispatched_thenMessageFiltered() {
dispatcher.post(new Message()).now();
assertNotNull(baseMessage);
assertNull(subMessage);
}
@Test
public void whenRejectDispatched_thenRejectFiltered() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(subMessage);
assertNull(baseMessage);
}
@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
this.baseMessage = message;
}
@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
this.subMessage = message;
}
Параметр фильтров
для аннотации @Handler
принимает класс
, который реализует IMessageFilter
. Библиотека предлагает два примера:
Filters.RejectSubtypes делает то
, что следует из его названия: он отфильтровывает любые подтипы. В этом случае мы видим, что RejectMessage
не обрабатывается handleBaseMessage().
Filters.SubtypesOnly также делает то
, что следует из его названия: он отфильтровывает любые базовые типы. В этом случае мы видим, что Message
не обрабатывается функцией handleSubMessage().
5.2. IMessageFilter
Оба фильтра
Filters.RejectSubtypes и Filters.SubtypesOnly
реализуют IMessageFilter
.
RejectSubTypes
сравнивает класс сообщения с его определенными типами сообщений и разрешает только сообщения, соответствующие одному из его типов, в отличие от любых подклассов.
5.3. Фильтр с условиями
К счастью, есть более простой способ фильтрации сообщений. MBassador поддерживает подмножество выражений Java EL в качестве условий для фильтрации сообщений.
Давайте отфильтруем сообщение String
на основе его длины:
private String testString;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
«Фубар!» сообщение состоит из семи символов и фильтруется. Давайте отправим более короткую строку
:
@Test
public void whenShortStringDispatched_thenStringHandled() {
dispatcher.post("foobar").now();
assertNotNull(testString);
}
Теперь «foobar» состоит всего из шести символов и пропускается.
Наш RejectMessage
содержит поле с аксессором. Напишем для этого фильтр:
private RejectMessage rejectMessage;
@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {
RejectMessage testReject = new RejectMessage();
testReject.setCode(-1);
dispatcher.post(testReject).now();
assertNull(rejectMessage);
assertNotNull(subMessage);
assertEquals(-1, ((RejectMessage) subMessage).getCode());
}
@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
this.rejectMessage = rejectMessage;
}
Здесь снова мы можем запросить метод объекта и либо отфильтровать сообщение, либо нет.
5.4. Захват отфильтрованных сообщений
Как и в случае с DeadEvents,
мы можем захотеть захватить и обработать отфильтрованные сообщения. Также существует специальный механизм для захвата отфильтрованных событий. Отфильтрованные события обрабатываются иначе, чем «мертвые» события.
Давайте напишем тест, иллюстрирующий это:
private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
assertNotNull(filteredMessage);
assertTrue(filteredMessage.getMessage() instanceof String);
assertNull(deadMessage);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
@Handler
public void handleFilterMessage(FilteredMessage message) {
this.filteredMessage = message;
}
@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
this.deadMessage = deadMessage;
}
С добавлением обработчика FilteredMessage
мы можем отслеживать строки
, отфильтрованные из-за их длины. filterMessage содержит нашу
слишком длинную строку
, в то время как deadMessage
остается нулевым.
6. Асинхронная отправка и обработка сообщений
До сих пор во всех наших примерах использовалась синхронная отправка сообщений; когда мы вызывали post.now()
, сообщения доставлялись каждому обработчику в том же потоке, из которого мы вызывали post()
.
6.1. Асинхронная отправка
MBassador.post ()
возвращает SyncAsyncPostCommand . Этот класс предлагает несколько методов, в том числе:
now()
— отправлять сообщения синхронно; вызов будет заблокирован до тех пор, пока все сообщения не будут доставленыasynchronously()
— выполняет публикацию сообщения асинхронно
Давайте используем асинхронную отправку в примере класса. Мы будем использовать Awaitility в этих тестах, чтобы упростить код:
private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenAsyncDispatched_thenMessageReceived() {
dispatcher.post("foobar").asynchronously();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testString);
}
@Handler
public void handleStringMessage(String message) {
this.testString = message;
ready.set(true);
}
В этом тесте мы вызываем asynchronously()
и используем AtomicBoolean
в качестве флага с await()
для ожидания доставки сообщения потоком доставки.
Если мы закомментируем вызов await()
, мы рискуем провалить тест, потому что мы проверяем testString
до завершения потока доставки.
6.2. Вызов асинхронного обработчика
Асинхронная отправка позволяет поставщику сообщений вернуться к обработке сообщений до того, как сообщения будут доставлены каждому обработчику, но он по-прежнему вызывает каждый обработчик по порядку, и каждый обработчик должен ждать завершения предыдущего.
Это может привести к проблемам, если один обработчик выполняет дорогостоящую операцию.
MBassador предоставляет механизм асинхронного вызова обработчика. Обработчики, настроенные для этого, получают сообщения в своем потоке:
private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenHandlerAsync_thenHandled() {
dispatcher.post(42).now();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testInteger);
assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}
@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
this.invocationThreadName = Thread.currentThread().getName();
this.testInteger = message;
ready.set(true);
}
Обработчики могут запрашивать асинхронный вызов с помощью свойства delivery = Invoke.Asynchronously
в аннотации Handler .
Мы проверяем это в нашем тесте, сравнивая имена потоков
в методе диспетчеризации и обработчике.
7. Настройка MBassador
До сих пор мы использовали экземпляр MBassador с конфигурацией по умолчанию. Поведение диспетчера можно изменить с помощью аннотаций, аналогичных тем, которые мы видели до сих пор; мы рассмотрим еще несколько, чтобы закончить этот урок.
7.1. Обработка исключений
Обработчики не могут определять проверенные исключения. Вместо этого диспетчеру можно предоставить IPublicationErrorHandler
в качестве аргумента его конструктору:
public class MBassadorConfigurationTest
implements IPublicationErrorHandler {
private MBassador dispatcher;
private String messageString;
private Throwable errorCause;
@Before
public void prepareTests() {
dispatcher = new MBassador<String>(this);
dispatcher.subscribe(this);
}
@Test
public void whenErrorOccurs_thenErrorHandler() {
dispatcher.post("Error").now();
assertNull(messageString);
assertNotNull(errorCause);
}
@Test
public void whenNoErrorOccurs_thenStringHandler() {
dispatcher.post("Error").now();
assertNull(errorCause);
assertNotNull(messageString);
}
@Handler
public void handleString(String message) {
if ("Error".equals(message)) {
throw new Error("BOOM");
}
messageString = message;
}
@Override
public void handleError(PublicationError error) {
errorCause = error.getCause().getCause();
}
}
Когда handleString()
выдает ошибку,
она сохраняется в errorCause.
7.2. Приоритет обработчика
Обработчики
вызываются в порядке, обратном их добавлению, но это не то поведение, на которое мы хотим полагаться. Даже имея возможность вызывать обработчики в своих потоках, нам может понадобиться знать, в каком порядке они будут вызываться.
Мы можем установить приоритет обработчика явно:
private LinkedList<Integer> list = new LinkedList<>();
@Test
public void whenRejectDispatched_thenPriorityHandled() {
dispatcher.post(new RejectMessage()).now();
// Items should pop() off in reverse priority order
assertTrue(1 == list.pop());
assertTrue(3 == list.pop());
assertTrue(5 == list.pop());
}
@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
list.push(5);
}
@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
list.push(3);
}
@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage)
logger.error("Reject handler #3");
list.push(3);
}
@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
list.push(1);
}
Обработчики вызываются от наивысшего приоритета к низшему. Обработчики с приоритетом по умолчанию, равным нулю, вызываются последними. Мы видим, что номера обработчиков pop()
отключаются в обратном порядке.
7.3. Отказ от подтипов, простой способ
Что случилось с handleMessage()
в приведенном выше тесте? Нам не нужно использовать RejectSubTypes.class
для фильтрации наших подтипов.
RejectSubTypes
— это логический флаг, который обеспечивает ту же фильтрацию, что и класс, но с более высокой производительностью, чем реализация IMessageFilter
.
Однако нам по-прежнему нужно использовать реализацию на основе фильтров только для приема подтипов.
8. Заключение
MBassador — это простая и понятная библиотека для передачи сообщений между объектами. Сообщения могут быть организованы различными способами и могут быть отправлены синхронно или асинхронно.
И, как всегда, пример доступен в этом проекте GitHub .