1. Обзор
В предыдущей статье у нас было краткое введение в Kafka Connect, включая различные типы соединителей, основные функции Connect, а также REST API.
В этом руководстве мы будем использовать коннекторы Kafka для создания более «реального» примера.
Мы будем использовать коннектор для сбора данных через MQTT и записывать собранные данные в MongoDB.
2. Настройка с помощью Docker
Мы будем использовать Docker Compose для настройки инфраструктуры. Это включает в себя брокера MQTT в качестве источника, Zookeeper, одного брокера Kafka, а также Kafka Connect в качестве промежуточного программного обеспечения и, наконец, экземпляр MongoDB, включающий инструмент с графическим интерфейсом в качестве приемника.
2.1. Установка разъема
Коннекторы, необходимые для нашего примера, источник MQTT, а также коннектор приемника MongoDB, не включены в простую Kafka или Confluent Platform.
Как мы обсуждали в предыдущей статье, мы можем загрузить коннекторы ( MQTT , а также MongoDB ) из концентратора Confluent. После этого нам нужно распаковать банки в папку, которую мы подключим к контейнеру Kafka Connect в следующем разделе.
Воспользуемся для этого папкой /tmp/custom/jars
. Мы должны переместить туда jar-файлы перед запуском стека компоновки в следующем разделе, поскольку Kafka Connect загружает коннекторы онлайн во время запуска.
2.2. Файл компоновки Docker
Мы описываем нашу настройку как простой компоновочный файл Docker, который состоит из шести контейнеров:
version: '3.3'
services:
mosquitto:
image: eclipse-mosquitto:1.5.5
hostname: mosquitto
container_name: mosquitto
expose:
- "1883"
ports:
- "1883:1883"
zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /tmp/custom/jars:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- mosquitto
mongo-db:
image: mongo:4.0.5
hostname: mongo-db
container_name: mongo-db
expose:
- "27017"
ports:
- "27017:27017"
command: --bind_ip_all --smallfiles
volumes:
- ./mongo-db:/data
mongoclient:
image: mongoclient/mongoclient:2.2.0
container_name: mongoclient
hostname: mongoclient
depends_on:
- mongo-db
ports:
- 3000:3000
environment:
MONGO_URL: "mongodb://mongo-db:27017"
PORT: 3000
expose:
- "3000"
Контейнер mosquitto
предоставляет простой брокер MQTT на основе Eclipse Mosquitto.
Контейнеры zookeeper
и kafka
определяют кластер Kafka с одним узлом.
kafka-connect
определяет наше приложение Connect в распределенном режиме.
И, наконец, mongo-db
определяет нашу базу данных приемника, а также веб- сервер mongoclient
, который помогает нам проверить, правильно ли отправленные данные поступили в базу данных.
Мы можем запустить стек с помощью следующей команды:
docker-compose up
3. Конфигурация разъема
Поскольку Kafka Connect запущен и работает, теперь мы можем настроить коннекторы.
3.1. Настройка исходного соединителя
Давайте настроим исходный коннектор с помощью REST API:
curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Наш файл connect-mqtt-source.json
выглядит так:
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics": "foreach",
"kafka.topic": "connect-custom",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
Есть несколько свойств, которые мы раньше не использовали:
mqtt.server.uri
— это конечная точка, к которой будет подключаться наш коннектор.mqtt.topics
— это тема MQTT, на которую подпишется наш коннектор .kafka.topic
определяет тему Kafka, в которую коннектор будет отправлять полученные данные.value.converter
определяет преобразователь, который будет применяться к полученной полезной нагрузке. Нам нуженByteArrayConverter
, так как коннектор MQTT по умолчанию использует Base64, а мы хотим использовать простой текстconfluent.topic.bootstrap.servers
требуется для новейшей версии коннектора- То же самое относится к
confluent.topic.replication.factor
: он определяет коэффициент репликации для внутренней темы Confluent — поскольку у нас есть только один узел в нашем кластере, мы должны установить это значение равным 1.
3.2. Коннектор тестового источника
Давайте проведем быстрый тест, опубликовав короткое сообщение брокеру MQTT:
docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto -t "foreach" -m "{\"id\":1234,\"message\":\"This is a test\"}"
А если слушать в тему, то connect-custom
:
docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning
то мы должны увидеть наше тестовое сообщение.
3.3. Коннектор приемника установки
Далее нам понадобится наш сливной соединитель. Давайте снова воспользуемся REST API:
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Наш файл connect-mongodb-sink.json
выглядит так:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": 1,
"topics": "connect-custom",
"mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
"mongodb.collection": "MyCollection",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Здесь у нас есть следующие специфичные для MongoDB свойства:
mongodb.connection.uri
содержит строку подключения для нашего экземпляра MongoDB.mongodb.collection
определяет коллекцию- Поскольку коннектор MongoDB ожидает JSON, мы должны установить
JsonConverter
дляkey.converter
иvalue.converter.
- И нам также нужен JSON без схемы для MongoDB, поэтому мы должны установить
key.converter.schemas.enable
иvalue.converter.schemas.enable
вfalse
3.4. Соединитель тестового приемника
Поскольку наша тема connect-custom
уже содержит сообщения из теста коннектора MQTT, коннектор MongoDB должен был получить их сразу после создания .
Следовательно, мы должны сразу найти их в нашей MongoDB. Для этого мы можем использовать веб-интерфейс, открыв URL-адрес http://localhost:3000/
. После входа в систему мы можем выбрать нашу MyCollection
слева, нажать Execute
, и наше тестовое сообщение должно отображаться.
3.5. Сквозной тест
Теперь мы можем отправить любую структуру JSON с помощью клиента MQTT:
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber": [{
"type": "home",
"number": "212 555-1234"
}, {
"type": "fax",
"number": "646 555-4567"
}],
"gender": {
"type": "male"
}
}
MongoDB поддерживает документы JSON без схем, а поскольку мы отключили схемы для нашего конвертера, любая структура немедленно передается через нашу цепочку соединителей и сохраняется в базе данных.
Опять же, мы можем использовать веб-интерфейс по адресу http://localhost:3000/
.
3.6. Очистить
Когда мы закончим, мы можем очистить наш эксперимент и удалить два соединителя:
curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink
После этого мы можем закрыть стек Compose с помощью Ctrl + C.
4. Вывод
В этом руководстве мы создали пример с использованием Kafka Connect для сбора данных через MQTT и записи собранных данных в MongoDB.
Как всегда, файлы конфигурации можно найти на GitHub .