1. Обзор
В этом руководстве мы будем использовать Java-клиент для NAT для подключения к серверу NATS , публикации и получения сообщений.
NATS предлагает три основных режима обмена сообщениями. Семантика публикации/подписки доставляет сообщения всем подписчикам темы. Обмен сообщениями «запрос/ответ» отправляет запросы через темы и направляет ответы обратно запрашивающей стороне.
Подписчики также могут присоединяться к группам очередей сообщений при подписке на тему. Сообщения, отправленные в связанную тему, доставляются только одному подписчику в группе очереди.
2. Настройка
2.1. Зависимость от Maven
Во-первых, нам нужно добавить библиотеку NATS в наш pom.xml:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>1.0</version>
</dependency>
Последнюю версию библиотеки можно найти здесь , а проект Github — здесь .
2.2. NATS-сервер
Во-вторых, нам понадобится сервер NATS для обмена сообщениями. Здесь есть инструкции для всех основных платформ .
Мы предполагаем, что на локальном хосте: 4222 работает сервер.
3. Подключайтесь и обменивайтесь сообщениями
3.1. Подключиться к НАТС
Метод connect()
в статическом классе NATS создает Connections
.
Если мы хотим использовать соединение с параметрами по умолчанию и прослушивание на локальном хосте через порт 4222, мы можем использовать метод по умолчанию:
Connection natsConnection = Nats.connect();
Но у Connections
есть много настраиваемых параметров, некоторые из которых мы хотим переопределить.
Мы создадим объект Options
и передадим его Nats
:
private Connection initConnection() {
Options options = new Options.Builder()
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();
return Nats.connect(uri, options);
}
Соединения
NATS долговечны. API попытается восстановить потерянное соединение.
Мы установили обратные вызовы, чтобы уведомлять нас об отключении и восстановлении соединения. В этом примере мы используем лямбда-выражения, но для приложений, которым нужно больше, чем просто регистрировать событие, мы можем установить объекты, реализующие требуемые интерфейсы.
Мы можем провести быстрый тест. Создайте соединение и добавьте сон на 60 секунд, чтобы процесс продолжал работать:
Connection natsConnection = initConnection();
Thread.sleep(60000);
Запустите это. Затем остановите и запустите сервер NATS:
[jnats-callbacks] ERROR com.foreach.nats.NatsClient
- Channel disconnected: io.nats.client.ConnectionImpl@79428dc1
[reconnect] WARN io.nats.client.ConnectionImpl
- couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.foreach.nats.NatsClient
- Reconnected to server: io.nats.client.ConnectionImpl@79428dc1
Мы можем видеть, что обратные вызовы регистрируют отключение и повторное подключение.
3.2. Подписаться на сообщения
Теперь, когда у нас есть соединение, мы можем заняться обработкой сообщений.
Сообщение
NATS — это контейнер для массива bytes[]
. В дополнение к ожидаемым методам setData(byte[])
и byte[] getData()
существуют методы для установки и получения адресата сообщения и ответа на темы.
Мы подписываемся на темы, которые являются Strings.
NATS поддерживает как синхронные, так и асинхронные подписки.
Давайте посмотрим на асинхронную подписку:
AsyncSubscription subscription = natsConnection
.subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));
API доставляет сообщения
нашему MessageHandler()
в своем потоке.
Некоторым приложениям может потребоваться вместо этого управлять потоком, обрабатывающим сообщения:
SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);
SyncSubscription
имеет блокирующий метод nextMessage()
, который будет блокироваться на указанное количество миллисекунд. Мы будем использовать синхронные подписки для наших тестов, чтобы сделать тестовые примеры простыми.
У AsyncSubscription
и SyncSubscription
есть метод unsubscribe ()
, который мы можем использовать для закрытия подписки.
subscription.unsubscribe();
3.3. Публикация сообщений
Публикация сообщений
может осуществляться несколькими способами.
Самый простой метод требует только строку
темы и байты
сообщения :
natsConnection.publish("foo.bar", "Hi there!".getBytes());
Если издатель хочет получить ответ или предоставить конкретную информацию об источнике сообщения, он также может отправить сообщение с темой для ответа:
natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());
Существуют также перегрузки для нескольких других комбинаций, таких как передача сообщения
вместо байтов
.
3.4. Простой обмен сообщениями
Имея действительный Connection
, мы можем написать тест, который проверяет обмен сообщениями:
SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());
Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
natsConnection
.publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());
message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));
Мы начинаем с подписки на две темы с синхронными подписками, так как они намного лучше работают в тесте JUnit. Затем мы отправляем сообщение одному из них, указав другой в качестве адреса replyTo .
После прочтения сообщения от первого адресата мы «переворачиваем» темы для отправки ответа.
3.5. Подписки с подстановочными знаками
Сервер NATS поддерживает подстановочные знаки темы.
Подстановочные знаки работают с токенами темы, разделенными знаком «.». персонаж. Символ звездочки '*' соответствует отдельному токену. Символ «больше» '>' является подстановочным знаком для оставшейся части темы, которая может быть более чем одним токеном.
Например:
- foo.* соответствует foo.bar, foo.requests, но не
соответствует foo.bar.requests
- foo.> соответствует foo.bar, foo.requests, foo.bar.requests, foo.bar.foreach и т. д.
Давайте попробуем несколько тестов:
SyncSubscription fooSubscription = client.subscribeSync("foo.*");
client.publishMessage("foo.bar", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);
SyncSubscription barSubscription = client.subscribeSync("foo.>");
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
4. Запрос/ответ на сообщения
Наш тест обмена сообщениями напоминал распространенную идиому в системах обмена сообщениями pub/sub; запрос/ответ. NATS имеет явную поддержку для этого обмена сообщениями запроса/ответа .
Издатели могут установить обработчик запросов, используя метод асинхронной подписки, который мы использовали выше:
AsyncSubscription subscription = natsConnection
.subscribe("foo.bar.requests", new MessageHandler() {
@Override
public void onMessage(Message msg) {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
}
});
Или они могут отвечать на запросы по мере их поступления.
API предоставляет метод request() :
Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);
Этот метод создает временный почтовый ящик для ответа и записывает для нас адрес для ответа.
Request()
возвращает ответ или null
, если время ожидания запроса истекло. Последний аргумент — количество миллисекунд ожидания.
Мы можем изменить наш тест для запроса/ответа:
natsConnection.subscribe(salary.requests", message -> {
natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));
5. Очереди сообщений
Подписчики могут указать группы очередей во время подписки. Когда сообщение публикуется в группе, NATS доставляет его одному и только одному подписчику .
Группы очередей не сохраняют сообщения. Если прослушиватели недоступны, сообщение отбрасывается.
5.1. Подписка на очереди
Подписчики указывают имя группы очередей в виде строки:
SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");
Конечно, есть и асинхронная версия:
SyncSubscription subscription = natsConnection
.subscribe("topic", "queue name", new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
Подписка создает очередь на сервере NATS.
5.2. Публикация в очереди
Публикация сообщения в группах очередей просто требует публикации в соответствующей теме:
natsConnection.publish("foo", "queue message".getBytes());
Сервер NATS направит сообщение в очередь и выберет получателя сообщения.
Мы можем проверить это с помощью теста:
SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");
natsConnection.publish("foo", "foobar".getBytes());
List<Message> messages = new ArrayList<>();
Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);
message = queue2.nextMessage(200);
if (message != null) messages.add(message);
assertEquals(1, messages.size());
Мы получаем только одно сообщение.
Если мы изменим первые две строки на обычную подписку:
SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");
Тест завершается неудачно, поскольку сообщение доставляется обоим подписчикам.
6. Заключение
В этом кратком введении мы подключились к серверу NATS и отправили как сообщения публикации/подписки, так и сообщения очереди с балансировкой нагрузки. Мы рассмотрели поддержку NATS для подписок с подстановочными знаками. Мы также использовали обмен сообщениями запрос/ответ.
Образцы кода, как всегда, можно найти на GitHub .