Перейти к основному содержимому

Публикация и получение сообщений с помощью Java-клиента Nats

· 7 мин. чтения

1. Обзор

В этом руководстве мы будем использовать Java-клиент для NAT для подключения к серверу NATS , публикации и получения сообщений.

NATS предлагает три основных режима обмена сообщениями. Семантика публикации/подписки доставляет сообщения всем подписчикам темы. Обмен сообщениями «запрос/ответ» отправляет запросы через темы и направляет ответы обратно запрашивающей стороне.

Подписчики также могут присоединяться к группам очередей сообщений при подписке на тему. Сообщения, отправленные в связанную тему, доставляются только одному подписчику в группе очереди.

2. Настройка

2.1. Зависимость от Maven

Во-первых, нам нужно добавить библиотеку NATS в наш pom.xml:

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>1.0</version>
</dependency>

Последнюю версию библиотеки можно найти здесь , а проект Github — здесь .

2.2. NATS-сервер

Во-вторых, нам понадобится сервер NATS для обмена сообщениями. Здесь есть инструкции для всех основных платформ .

Мы предполагаем, что на локальном хосте: 4222 работает сервер.

3. Подключайтесь и обменивайтесь сообщениями

3.1. Подключиться к НАТС

Метод connect() в статическом классе NATS создает Connections .

Если мы хотим использовать соединение с параметрами по умолчанию и прослушивание на локальном хосте через порт 4222, мы можем использовать метод по умолчанию:

Connection natsConnection = Nats.connect();

Но у Connections есть много настраиваемых параметров, некоторые из которых мы хотим переопределить.

Мы создадим объект Options и передадим его Nats :

private Connection initConnection() {
Options options = new Options.Builder()
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();

return Nats.connect(uri, options);
}

Соединения NATS долговечны. API попытается восстановить потерянное соединение.

Мы установили обратные вызовы, чтобы уведомлять нас об отключении и восстановлении соединения. В этом примере мы используем лямбда-выражения, но для приложений, которым нужно больше, чем просто регистрировать событие, мы можем установить объекты, реализующие требуемые интерфейсы.

Мы можем провести быстрый тест. Создайте соединение и добавьте сон на 60 секунд, чтобы процесс продолжал работать:

Connection natsConnection = initConnection();
Thread.sleep(60000);

Запустите это. Затем остановите и запустите сервер NATS:

[jnats-callbacks] ERROR com.foreach.nats.NatsClient 
- Channel disconnected: io.nats.client.ConnectionImpl@79428dc1
[reconnect] WARN io.nats.client.ConnectionImpl
- couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.foreach.nats.NatsClient
- Reconnected to server: io.nats.client.ConnectionImpl@79428dc1

Мы можем видеть, что обратные вызовы регистрируют отключение и повторное подключение.

3.2. Подписаться на сообщения

Теперь, когда у нас есть соединение, мы можем заняться обработкой сообщений.

Сообщение NATS — это контейнер для массива bytes[] . В дополнение к ожидаемым методам setData(byte[]) и byte[] getData() существуют методы для установки и получения адресата сообщения и ответа на темы.

Мы подписываемся на темы, которые являются Strings.

NATS поддерживает как синхронные, так и асинхронные подписки.

Давайте посмотрим на асинхронную подписку:

AsyncSubscription subscription = natsConnection
.subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));

API доставляет сообщения нашему MessageHandler() в своем потоке.

Некоторым приложениям может потребоваться вместо этого управлять потоком, обрабатывающим сообщения:

SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);

SyncSubscription имеет блокирующий метод nextMessage() , который будет блокироваться на указанное количество миллисекунд. Мы будем использовать синхронные подписки для наших тестов, чтобы сделать тестовые примеры простыми.

У AsyncSubscription и SyncSubscription есть метод unsubscribe () , который мы можем использовать для закрытия подписки.

subscription.unsubscribe();

3.3. Публикация сообщений

Публикация сообщений может осуществляться несколькими способами.

Самый простой метод требует только строку темы и байты сообщения :

natsConnection.publish("foo.bar", "Hi there!".getBytes());

Если издатель хочет получить ответ или предоставить конкретную информацию об источнике сообщения, он также может отправить сообщение с темой для ответа:

natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());

Существуют также перегрузки для нескольких других комбинаций, таких как передача сообщения вместо байтов .

3.4. Простой обмен сообщениями

Имея действительный Connection , мы можем написать тест, который проверяет обмен сообщениями:

SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());

Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

natsConnection
.publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());

message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));

Мы начинаем с подписки на две темы с синхронными подписками, так как они намного лучше работают в тесте JUnit. Затем мы отправляем сообщение одному из них, указав другой в качестве адреса replyTo .

После прочтения сообщения от первого адресата мы «переворачиваем» темы для отправки ответа.

3.5. Подписки с подстановочными знаками

Сервер NATS поддерживает подстановочные знаки темы.

Подстановочные знаки работают с токенами темы, разделенными знаком «.». персонаж. Символ звездочки '*' соответствует отдельному токену. Символ «больше» '>' является подстановочным знаком для оставшейся части темы, которая может быть более чем одним токеном.

Например:

  • foo.* соответствует foo.bar, foo.requests, но не соответствует foo.bar.requests
  • foo.> соответствует foo.bar, foo.requests, foo.bar.requests, foo.bar.foreach и т. д.

Давайте попробуем несколько тестов:

SyncSubscription fooSubscription = client.subscribeSync("foo.*");

client.publishMessage("foo.bar", "bar.foo", "hello there");

Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);

SyncSubscription barSubscription = client.subscribeSync("foo.>");

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");

message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

4. Запрос/ответ на сообщения

Наш тест обмена сообщениями напоминал распространенную идиому в системах обмена сообщениями pub/sub; запрос/ответ. NATS имеет явную поддержку для этого обмена сообщениями запроса/ответа .

Издатели могут установить обработчик запросов, используя метод асинхронной подписки, который мы использовали выше:

AsyncSubscription subscription = natsConnection
.subscribe("foo.bar.requests", new MessageHandler() {
@Override
public void onMessage(Message msg) {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
}
});

Или они могут отвечать на запросы по мере их поступления.

API предоставляет метод request() :

Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);

Этот метод создает временный почтовый ящик для ответа и записывает для нас адрес для ответа.

Request() возвращает ответ или null , если время ожидания запроса истекло. Последний аргумент — количество миллисекунд ожидания.

Мы можем изменить наш тест для запроса/ответа:

natsConnection.subscribe(salary.requests", message -> {
natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));

5. Очереди сообщений

Подписчики могут указать группы очередей во время подписки. Когда сообщение публикуется в группе, NATS доставляет его одному и только одному подписчику .

Группы очередей не сохраняют сообщения. Если прослушиватели недоступны, сообщение отбрасывается.

5.1. Подписка на очереди

Подписчики указывают имя группы очередей в виде строки:

SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");

Конечно, есть и асинхронная версия:

SyncSubscription subscription = natsConnection
.subscribe("topic", "queue name", new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});

Подписка создает очередь на сервере NATS.

5.2. Публикация в очереди

Публикация сообщения в группах очередей просто требует публикации в соответствующей теме:

natsConnection.publish("foo",  "queue message".getBytes());

Сервер NATS направит сообщение в очередь и выберет получателя сообщения.

Мы можем проверить это с помощью теста:

SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");

natsConnection.publish("foo", "foobar".getBytes());

List<Message> messages = new ArrayList<>();

Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);

message = queue2.nextMessage(200);
if (message != null) messages.add(message);

assertEquals(1, messages.size());

Мы получаем только одно сообщение.

Если мы изменим первые две строки на обычную подписку:

SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");

Тест завершается неудачно, поскольку сообщение доставляется обоим подписчикам.

6. Заключение

В этом кратком введении мы подключились к серверу NATS и отправили как сообщения публикации/подписки, так и сообщения очереди с балансировкой нагрузки. Мы рассмотрели поддержку NATS для подписок с подстановочными знаками. Мы также использовали обмен сообщениями запрос/ответ.

Образцы кода, как всегда, можно найти на GitHub .