Перейти к основному содержимому

Надежный обмен сообщениями с JGroups

· 7 мин. чтения

1. Обзор

JGroups — это Java API для надежного обмена сообщениями. Он имеет простой интерфейс, который обеспечивает:

  • гибкий стек протоколов, включая TCP и UDP
  • фрагментация и повторная сборка больших сообщений
  • надежный одноадресный и многоадресный
  • обнаружение отказа
  • управление потоком

А также многие другие функции.

В этом руководстве мы создадим простое приложение для обмена строковыми сообщениями между приложениями и предоставления общего состояния новым приложениям по мере их подключения к сети.

2. Настройка

2.1. Зависимость от Maven

Нам нужно добавить одну зависимость в наш pom.xml :

<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>4.0.10.Final</version>
</dependency>

Последнюю версию библиотеки можно проверить на Maven Central.

2.2. Сеть

JGroups попытается использовать IPV6 по умолчанию. В зависимости от конфигурации нашей системы это может привести к тому, что приложения не смогут обмениваться данными.

Чтобы избежать этого, мы установим для свойства java.net.preferIPv4Stack значение true при запуске наших приложений здесь:

java -Djava.net.preferIPv4Stack=true com.foreach.jgroups.JGroupsMessenger

3. Каналы J

Наше подключение к сети JGroups — это JChannel. Канал присоединяется к кластеру и отправляет и получает сообщения, а также информацию о состоянии сети.

3.1. Создание канала

Мы создаем JChannel с путем к файлу конфигурации. Если мы опустим имя файла, он будет искать udp.xml в текущем рабочем каталоге.

Мы создадим канал с явно названным файлом конфигурации:

JChannel channel = new JChannel("src/main/resources/udp.xml");

Конфигурация JGroups может быть очень сложной, но стандартных конфигураций UDP и TCP достаточно для большинства приложений. Мы включили файл для UDP в наш код и будем использовать его в этом руководстве.

Дополнительную информацию о настройке транспорта смотрите в руководстве по JGroups здесь .

3.2. Подключение канала

После того, как мы создали наш канал, нам нужно присоединиться к кластеру. Кластер — это группа узлов, которые обмениваются сообщениями.

Для присоединения к кластеру требуется имя кластера:

channel.connect("ForEach");

Первый узел, который попытается присоединиться к кластеру, создаст его, если он не существует. Ниже мы увидим этот процесс в действии.

3.3. Название канала

Узлы идентифицируются по имени, поэтому одноранговые узлы могут отправлять направленные сообщения и получать уведомления о том, кто входит и выходит из кластера. JGroups назначит имя автоматически, или мы можем установить свое собственное:

channel.name("user1");

Мы будем использовать эти имена ниже, чтобы отслеживать, когда узлы входят в кластер и покидают его.

3.4. Закрытие канала

Очистка канала необходима, если мы хотим, чтобы одноранговые узлы получали своевременное уведомление о выходе.

Мы закрываем JChannel с помощью метода close:

channel.close()

4. Изменения представления кластера

Создав JChannel, мы теперь готовы видеть состояние одноранговых узлов в кластере и обмениваться с ними сообщениями.

JGroups поддерживает состояние кластера внутри класса View . Каждый канал имеет один вид сети. Когда представление изменяется, оно доставляется с помощью обратного вызова viewAccepted() .

В этом руководстве мы расширим класс API ReceiverAdaptor , который реализует все методы интерфейса, необходимые для приложения.

Это рекомендуемый способ реализации обратных вызовов.

Давайте добавим viewAccepted в наше приложение:

public void viewAccepted(View newView) {

private View lastView;

if (lastView == null) {
System.out.println("Received initial view:");
newView.forEach(System.out::println);
} else {
System.out.println("Received new view.");

List<Address> newMembers = View.newMembers(lastView, newView);
System.out.println("New members: ");
newMembers.forEach(System.out::println);

List<Address> exMembers = View.leftMembers(lastView, newView);
System.out.println("Exited members:");
exMembers.forEach(System.out::println);
}
lastView = newView;
}

Каждое представление содержит список объектов Address , представляющих каждого члена кластера. JGroups предлагает удобные методы для сравнения одного представления с другим, которые мы используем для обнаружения новых или закрытых членов кластера.

5. Отправка сообщений

Обработка сообщений в JGroups проста. Сообщение содержит массив байтов и объекты Address , соответствующие отправителю и получателю.

В этом руководстве мы используем строки , считываемые из командной строки, но легко увидеть, как приложение может обмениваться другими типами данных.

5.1. Широковещательные сообщения

Сообщение создается с назначением и массивом байтов; JChannel устанавливает для нас отправителя. Если цель равна null , `` весь кластер получит сообщение.

Мы примем текст из командной строки и отправим его в кластер:

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);

Если мы запустим несколько экземпляров нашей программы и отправим это сообщение (после того, как мы реализуем метод receive() ниже), все они получат его, включая отправителя.

5.2. Блокировка наших сообщений

Если мы не хотим видеть наши сообщения, мы можем установить для этого свойство:

channel.setDiscardOwnMessages(true);

Когда мы запускаем предыдущий тест, отправитель сообщения не получает свое широковещательное сообщение.

5.3. Прямые сообщения

Для отправки прямого сообщения требуется действительный адрес . Если мы обращаемся к узлам по имени, нам нужен способ поиска адреса . К счастью, у нас есть View для этого.

Текущее представление всегда доступно из JChannel :

private Optional<address> getAddress(String name) { 
View view = channel.view();
return view.getMembers().stream()
.filter(address -> name.equals(address.toString()))
.findAny();
}

Имена адресов доступны через метод класса toString() , поэтому мы просто ищем в списке членов кластера нужное нам имя.

Таким образом, мы можем принять имя из консоли, найти связанный пункт назначения и отправить прямое сообщение:

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
.orElseThrow(() -> new Exception("Destination not found");
Message message = new Message(destination, "Hi there!");
channel.send(message);

6. Получение сообщений

Мы можем отправлять сообщения, теперь давайте добавим попробовать их получить сейчас.

Давайте переопределим пустой метод получения ReceiverAdaptor :

public void receive(Message message) {
String line = Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
}

Поскольку мы знаем, что сообщение содержит строку , мы можем безопасно передать getObject() в System.out .

7. Государственная биржа

Когда узел входит в сеть, ему может потребоваться получить информацию о состоянии кластера. JGroups предоставляет для этого механизм передачи состояния.

Когда узел присоединяется к кластеру, он просто вызывает getState() . Кластер обычно получает состояние от самого старшего члена группы — координатора.

Давайте добавим счетчик широковещательных сообщений в наше приложение. Мы добавим новую переменную-член и увеличим ее внутри receive() :

private Integer messageCount = 0;

public void receive(Message message) {
String line = "Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);

if (message.getDest() == null) {
messageCount++;
System.out.println("Message count: " + messageCount);
}
}

Мы проверяем нулевой пункт назначения, потому что, если мы подсчитаем прямые сообщения, каждый узел будет иметь разный номер.

Далее мы переопределяем еще два метода в ReceiverAdaptor :

public void setState(InputStream input) {
try {
messageCount = Util.objectFromStream(new DataInputStream(input));
} catch (Exception e) {
System.out.println("Error deserialing state!");
}
System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
Util.objectToStream(messageCount, new DataOutputStream(output));
}

Подобно сообщениям, JGroups передает состояние в виде массива байтов .

JGroups предоставляет координатору InputStream для записи состояния и OutputStream для чтения новым узлом. API предоставляет удобные классы для сериализации и десериализации данных.

Обратите внимание, что в производственном коде доступ к информации о состоянии должен быть потокобезопасным.

Наконец, мы добавляем вызов getState() в наш запуск после подключения к кластеру:

channel.connect(clusterName);
channel.getState(null, 0);

getState() принимает место назначения, из которого запрашивается состояние, и время ожидания в миллисекундах. Нулевой пункт назначения указывает на координатора, а 0 означает отсутствие тайм-аута.

Когда мы запускаем это приложение с парой узлов и обмениваемся широковещательными сообщениями, мы видим увеличение количества сообщений.

Затем, если мы добавим третьего клиента или остановим и запустим один из них, мы увидим, что вновь подключенный узел печатает правильное количество сообщений.

8. Заключение

В этом руководстве мы использовали JGroups для создания приложения для обмена сообщениями. Мы использовали API для отслеживания того, какие узлы подключались к кластеру и покидали его, а также для передачи состояния кластера новому узлу при его присоединении.

Образцы кода, как всегда, можно найти на GitHub .