Перейти к основному содержимому

MQTT-клиент на Java

· 6 мин. чтения

1. Обзор

В этом руководстве мы увидим, как мы можем добавить обмен сообщениями MQTT в проект Java, используя библиотеки, предоставленные проектом Eclipse Paho .

2. Учебник по MQTT

MQTT (MQ Telemetry Transport) — это протокол обмена сообщениями, который был создан для удовлетворения потребности в простом и легком методе передачи данных на устройства с низким энергопотреблением и с них, например, используемые в промышленных приложениях.

С ростом популярности устройств IoT (Интернет вещей) MQTT получил все более широкое распространение, что привело к его стандартизации OASIS и ISO.

Протокол поддерживает единый шаблон обмена сообщениями, а именно шаблон публикации-подписки: каждое сообщение, отправляемое клиентом, содержит связанную «тему», которая используется брокером для маршрутизации его подписавшимся клиентам. Имена тем могут быть простыми строками, такими как « oiltemp », или строкой в виде пути « motor/1/rpm ».

Чтобы получать сообщения, клиент подписывается на одну или несколько тем, используя их точное название или строку, содержащую один из поддерживаемых подстановочных знаков («#» для многоуровневых тем и «+» для одноуровневых).

3. Настройка проекта

Чтобы включить библиотеку Paho в проект Maven, мы должны добавить следующую зависимость:

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>

Последнюю версию модуля библиотеки Eclipse Paho Java можно загрузить с Maven Central.

4. Настройка клиента

При использовании библиотеки Paho первое, что нам нужно сделать, чтобы отправлять и/или получать сообщения от брокера MQTT, — это получить реализацию интерфейса IMqttClient . Этот интерфейс содержит все методы, необходимые приложению для установления соединения с сервером, отправки и получения сообщений.

Paho поставляется с двумя реализациями этого интерфейса: асинхронной ( MqttAsyncClient ) и синхронной ( MqttClient ). `` В нашем случае мы сосредоточимся на синхронной версии, которая имеет более простую семантику.

Сама установка представляет собой двухэтапный процесс: сначала мы создаем экземпляр класса MqttClient , а затем подключаем его к нашему серверу. В следующем подразделе подробно описаны эти шаги.

4.1. Создание нового экземпляра IMqttClient

В следующем фрагменте кода показано, как создать новый синхронный экземпляр IMqttClient :

String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

В этом случае мы используем простейший доступный конструктор, который принимает адрес конечной точки нашего брокера MQTT и идентификатор клиента , который однозначно идентифицирует нашего клиента.

В нашем случае мы использовали случайный UUID, поэтому при каждом запуске будет генерироваться новый идентификатор клиента.

Paho также предоставляет дополнительные конструкторы, которые мы можем использовать для настройки механизма сохраняемости, используемого для хранения неподтвержденных сообщений, и/или ScheduledExecutorService , используемого для запуска фоновых задач, требуемых реализацией механизма протокола.

Конечная точка сервера, которую мы используем, — это общедоступный брокер MQTT, размещенный в проекте Paho , который позволяет любому, у кого есть подключение к Интернету, тестировать клиентов без необходимости какой-либо аутентификации.

4.2. Подключение к серверу

Наш недавно созданный экземпляр MqttClient не подключен к серверу. Мы делаем это, вызывая его метод connect() , при необходимости передавая экземпляр MqttConnectOptions , который позволяет нам настраивать некоторые аспекты протокола.

В частности, мы можем использовать эти параметры для передачи дополнительной информации, такой как учетные данные безопасности, режим восстановления сеанса, режим повторного подключения и так далее.

Класс MqttConnectionOptions предоставляет эти параметры как простые свойства, которые мы можем установить с помощью обычных методов установки. Нам нужно только установить свойства, необходимые для нашего сценария — остальные примут значения по умолчанию.

Код, используемый для установления соединения с сервером, обычно выглядит так:

MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);

Здесь мы определяем наши параметры подключения, чтобы:

  • Библиотека автоматически попытается переподключиться к серверу в случае сбоя сети.
  • Он будет отбрасывать неотправленные сообщения из предыдущего запуска
  • Время ожидания соединения установлено на 10 секунд.

5. Отправка сообщений

Отправка сообщений с помощью уже подключенного MqttClient очень проста. Мы используем один из вариантов метода publish() для отправки полезной нагрузки, которая всегда представляет собой массив байтов, в заданную тему с использованием одного из следующих параметров качества обслуживания:

  • 0 — семантика «не более одного раза», также известная как «выстрелил-забыл». Используйте эту опцию, когда потеря сообщения приемлема, так как она не требует какого-либо подтверждения или постоянства.
  • 1 – семантика «хотя бы один раз». Используйте этот вариант, если потеря сообщений неприемлема и ваши подписчики могут обрабатывать дубликаты.
  • 2 – семантика «ровно один раз». Используйте этот вариант, когда потеря сообщений неприемлема и ваши подписчики не могут обрабатывать дубликаты.

В нашем примере проекта класс EngineTemperatureSensor играет роль фиктивного датчика, который выдает новое значение температуры каждый раз, когда мы вызываем его метод call() .

Этот класс реализует интерфейс Callable , поэтому мы можем легко использовать его с одной из реализаций ExecutorService , доступных в пакете java.util.concurrent :

public class EngineTemperatureSensor implements Callable<Void> {

// ... private members omitted

public EngineTemperatureSensor(IMqttClient client) {
this.client = client;
}

@Override
public Void call() throws Exception {
if ( !client.isConnected()) {
return null;
}
MqttMessage msg = readEngineTemp();
msg.setQos(0);
msg.setRetained(true);
client.publish(TOPIC,msg);
return null;
}

private MqttMessage readEngineTemp() {
double temp = 80 + rnd.nextDouble() * 20.0;
byte[] payload = String.format("T:%04.2f",temp)
.getBytes();
return new MqttMessage(payload);
}
}

MqttMessage инкапсулирует саму полезную нагрузку, запрошенное качество обслуживания, а также сохраненный флаг для сообщения. Этот флаг указывает брокеру, что он должен сохранять это сообщение до тех пор, пока оно не будет использовано подписчиком.

Мы можем использовать эту функцию для реализации «последнего известного хорошего» поведения, поэтому, когда новый подписчик подключается к серверу, он сразу же получает сохраненное сообщение.

6. Получение сообщений

Чтобы получать сообщения от MQTT-брокера, нам нужно использовать один из вариантов метода subscribe() , которые позволяют указать:

  • Один или несколько фильтров тем для сообщений, которые мы хотим получать
  • Связанный QoS
  • Обработчик обратного вызова для обработки полученных сообщений

В следующем примере показано, как добавить прослушиватель сообщений в существующий экземпляр IMqttClient для получения сообщений из заданной темы. Мы используем CountDownLatch в качестве механизма синхронизации между нашим обратным вызовом и основным потоком выполнения, уменьшая его каждый раз, когда приходит новое сообщение.

В примере кода мы использовали другой экземпляр IMqttClient для получения сообщений. Мы сделали это просто для того, чтобы было понятнее, какой клиент что делает, но это не ограничение Paho — при желании вы можете использовать один и тот же клиент для публикации и получения сообщений:

CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
byte[] payload = msg.getPayload();
// ... payload handling omitted
receivedSignal.countDown();
});
receivedSignal.await(1, TimeUnit.MINUTES);

Используемый выше вариант subscribe() принимает экземпляр IMqttMessageListener в качестве второго аргумента.

В нашем случае мы используем простую лямбда-функцию, которая обрабатывает полезную нагрузку и уменьшает значение счетчика. Если в указанное временное окно (1 минута) поступает недостаточно сообщений, метод await() выдает исключение.

При использовании Paho нам не нужно явно подтверждать получение сообщения. Если обратный вызов возвращается нормально, Paho считает его успешным и отправляет подтверждение на сервер.

Если обратный вызов выдает Exception , клиент будет закрыт. Обратите внимание, что это приведет к потере всех сообщений, отправленных с уровнем QoS, равным 0 .

Сообщения, отправленные с уровнем QoS 1 или 2, будут повторно отправлены сервером после повторного подключения клиента и повторной подписки на тему.

7. Заключение

В этой статье мы продемонстрировали, как мы можем добавить поддержку протокола MQTT в наши Java-приложения, используя библиотеку, предоставленную проектом Eclipse Paho.

Эта библиотека обрабатывает все детали низкоуровневого протокола, что позволяет нам сосредоточиться на других аспектах нашего решения, оставляя при этом достаточно места для настройки важных аспектов его внутренних функций, таких как сохраняемость сообщений.

Код, показанный в этой статье, доступен на GitHub .