Перейти к основному содержимому

Пример Kafka Connect с MQTT и MongoDB

· 6 мин. чтения

1. Обзор

В предыдущей статье у нас было краткое введение в Kafka Connect, включая различные типы соединителей, основные функции Connect, а также REST API.

В этом руководстве мы будем использовать коннекторы Kafka для создания более «реального» примера.

Мы будем использовать коннектор для сбора данных через MQTT и записывать собранные данные в MongoDB.

2. Настройка с помощью Docker

Мы будем использовать Docker Compose для настройки инфраструктуры. Это включает в себя брокера MQTT в качестве источника, Zookeeper, одного брокера Kafka, а также Kafka Connect в качестве промежуточного программного обеспечения и, наконец, экземпляр MongoDB, включающий инструмент с графическим интерфейсом в качестве приемника.

2.1. Установка разъема

Коннекторы, необходимые для нашего примера, источник MQTT, а также коннектор приемника MongoDB, не включены в простую Kafka или Confluent Platform.

Как мы обсуждали в предыдущей статье, мы можем загрузить коннекторы ( MQTT , а также MongoDB ) из концентратора Confluent. После этого нам нужно распаковать банки в папку, которую мы подключим к контейнеру Kafka Connect в следующем разделе.

Воспользуемся для этого папкой /tmp/custom/jars . Мы должны переместить туда jar-файлы перед запуском стека компоновки в следующем разделе, поскольку Kafka Connect загружает коннекторы онлайн во время запуска.

2.2. Файл компоновки Docker

Мы описываем нашу настройку как простой компоновочный файл Docker, который состоит из шести контейнеров:

version: '3.3'

services:
mosquitto:
image: eclipse-mosquitto:1.5.5
hostname: mosquitto
container_name: mosquitto
expose:
- "1883"
ports:
- "1883:1883"
zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /tmp/custom/jars:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- mosquitto
mongo-db:
image: mongo:4.0.5
hostname: mongo-db
container_name: mongo-db
expose:
- "27017"
ports:
- "27017:27017"
command: --bind_ip_all --smallfiles
volumes:
- ./mongo-db:/data
mongoclient:
image: mongoclient/mongoclient:2.2.0
container_name: mongoclient
hostname: mongoclient
depends_on:
- mongo-db
ports:
- 3000:3000
environment:
MONGO_URL: "mongodb://mongo-db:27017"
PORT: 3000
expose:
- "3000"

Контейнер mosquitto предоставляет простой брокер MQTT на основе Eclipse Mosquitto.

Контейнеры zookeeper и kafka определяют кластер Kafka с одним узлом.

kafka-connect определяет наше приложение Connect в распределенном режиме.

И, наконец, mongo-db определяет нашу базу данных приемника, а также веб- сервер mongoclient , который помогает нам проверить, правильно ли отправленные данные поступили в базу данных.

Мы можем запустить стек с помощью следующей команды:

docker-compose up

3. Конфигурация разъема

Поскольку Kafka Connect запущен и работает, теперь мы можем настроить коннекторы.

3.1. Настройка исходного соединителя

Давайте настроим исходный коннектор с помощью REST API:

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Наш файл connect-mqtt-source.json выглядит так:

{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics": "foreach",
"kafka.topic": "connect-custom",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}

Есть несколько свойств, которые мы раньше не использовали:

  • mqtt.server.uri — это конечная точка, к которой будет подключаться наш коннектор.
  • mqtt.topics — это тема MQTT, на которую подпишется наш коннектор .
  • kafka.topic определяет тему Kafka, в которую коннектор будет отправлять полученные данные.
  • value.converter определяет преобразователь, который будет применяться к полученной полезной нагрузке. Нам нужен ByteArrayConverter , так как коннектор MQTT по умолчанию использует Base64, а мы хотим использовать простой текст
  • confluent.topic.bootstrap.servers требуется для новейшей версии коннектора
  • То же самое относится к confluent.topic.replication.factor : он определяет коэффициент репликации для внутренней темы Confluent — поскольку у нас есть только один узел в нашем кластере, мы должны установить это значение равным 1.

3.2. Коннектор тестового источника

Давайте проведем быстрый тест, опубликовав короткое сообщение брокеру MQTT:

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "foreach" -m "{\"id\":1234,\"message\":\"This is a test\"}"

А если слушать в тему, то connect-custom :

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

то мы должны увидеть наше тестовое сообщение.

3.3. Коннектор приемника установки

Далее нам понадобится наш сливной соединитель. Давайте снова воспользуемся REST API:

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Наш файл connect-mongodb-sink.json выглядит так:

{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": 1,
"topics": "connect-custom",
"mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
"mongodb.collection": "MyCollection",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}

Здесь у нас есть следующие специфичные для MongoDB свойства:

  • mongodb.connection.uri содержит строку подключения для нашего экземпляра MongoDB.
  • mongodb.collection определяет коллекцию
  • Поскольку коннектор MongoDB ожидает JSON, мы должны установить JsonConverter для key.converter и value.converter.
  • И нам также нужен JSON без схемы для MongoDB, поэтому мы должны установить key.converter.schemas.enable и value.converter.schemas.enable в false

3.4. Соединитель тестового приемника

Поскольку наша тема connect-custom уже содержит сообщения из теста коннектора MQTT, коннектор MongoDB должен был получить их сразу после создания .

Следовательно, мы должны сразу найти их в нашей MongoDB. Для этого мы можем использовать веб-интерфейс, открыв URL-адрес http://localhost:3000/ . После входа в систему мы можем выбрать нашу MyCollection слева, нажать Execute , и наше тестовое сообщение должно отображаться.

3.5. Сквозной тест

Теперь мы можем отправить любую структуру JSON с помощью клиента MQTT:

{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber": [{
"type": "home",
"number": "212 555-1234"
}, {
"type": "fax",
"number": "646 555-4567"
}],
"gender": {
"type": "male"
}
}

MongoDB поддерживает документы JSON без схем, а поскольку мы отключили схемы для нашего конвертера, любая структура немедленно передается через нашу цепочку соединителей и сохраняется в базе данных.

Опять же, мы можем использовать веб-интерфейс по адресу http://localhost:3000/ .

3.6. Очистить

Когда мы закончим, мы можем очистить наш эксперимент и удалить два соединителя:

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

После этого мы можем закрыть стек Compose с помощью Ctrl + C.

4. Вывод

В этом руководстве мы создали пример с использованием Kafka Connect для сбора данных через MQTT и записи собранных данных в MongoDB.

Как всегда, файлы конфигурации можно найти на GitHub .