1. Обзор
Apache Kafka® — это распределенная платформа потоковой передачи. В предыдущем уроке мы обсуждали, как реализовать потребителей и производителей Kafka с помощью Spring .
В этом руководстве мы узнаем, как использовать коннекторы Kafka.
Мы рассмотрим:
- Различные типы коннекторов Kafka
- Функции и режимы Kafka Connect
- Конфигурация соединителей с использованием файлов свойств, а также REST API
2. Основы Kafka Connect и Kafka Connectors
Kafka Connect — это фреймворк для подключения Kafka к внешним системам , таким как базы данных, хранилища ключей и значений, поисковые индексы и файловые системы, с помощью так называемых коннекторов
.
Kafka Connectors — это готовые к использованию компоненты, которые могут помочь нам импортировать данные из внешних систем в темы Kafka и экспортировать данные из тем Kafka во внешние системы . Мы можем использовать существующие реализации соединителей для общих источников данных и приемников или реализовать собственные соединители.
Исходный коннектор
собирает данные из системы. Исходные системы могут быть целыми базами данных, таблицами потоков или брокерами сообщений. Исходный коннектор также может собирать метрики с серверов приложений в темы Kafka, делая данные доступными для потоковой обработки с малой задержкой.
Соединитель приемника
доставляет данные из разделов Kafka в другие системы, которые могут быть индексами, такими как Elasticsearch, пакетными системами, такими как Hadoop, или любой другой базой данных.
Некоторые соединители поддерживаются сообществом, а другие поддерживаются Confluent или его партнерами. Действительно, мы можем найти разъемы для большинства популярных систем, таких как S3, JDBC и Cassandra, и это лишь некоторые из них.
3. Особенности
Возможности Kafka Connect включают в себя:
- Фреймворк для подключения внешних систем к Kafka — упрощает разработку, развертывание и управление коннекторами.
- Распределенный и автономный режимы — это помогает нам развертывать большие кластеры, используя распределенный характер Kafka, а также настройки для разработки, тестирования и небольших производственных развертываний.
- Интерфейс REST — мы можем управлять соединителями с помощью REST API.
- Автоматическое управление смещением — Kafka Connect помогает нам обрабатывать процесс фиксации смещения, что избавляет нас от необходимости вручную реализовывать эту подверженную ошибкам часть разработки коннектора.
- Распределенная и масштабируемая по умолчанию — Kafka Connect использует существующий протокол управления группами; мы можем добавить больше рабочих для масштабирования кластера Kafka Connect.
- Потоковая и пакетная интеграция — Kafka Connect — идеальное решение для объединения систем потоковой и пакетной передачи данных в сочетании с существующими возможностями Kafka.
- Преобразования — они позволяют нам вносить простые и легкие изменения в отдельные сообщения.
4. Настройка
Вместо использования простого дистрибутива Kafka мы загрузим Confluent Platform, дистрибутив Kafka, предоставленный Confluent, Inc., компанией, стоящей за Kafka. Confluent Platform поставляется с некоторыми дополнительными инструментами и клиентами по сравнению с простой Kafka, а также некоторыми дополнительными готовыми соединителями.
Для нашего случая достаточно версии с открытым исходным кодом, которую можно найти на сайте Confluent .
5. Быстрый старт Kafka Connect
Для начала мы обсудим принцип Kafka Connect, используя его самые основные соединители, которые являются соединителем источника
файла и соединителем приемника
файла .
Удобно, что Confluent Platform поставляется с обоими этими коннекторами, а также с эталонными конфигурациями.
5.1. Конфигурация исходного коннектора
Для исходного соединителя эталонная конфигурация доступна по адресу $CONFLUENT_HOME/etc/kafka/connect-file-source.properties
:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
Эта конфигурация имеет некоторые свойства, общие для всех исходных соединителей:
имя
— указанное пользователем имя экземпляра соединителя.Connector.class
указывает класс реализации, в основном вид соединителяtasks.max
указывает, сколько экземпляров нашего исходного коннектора должно работать параллельно, итема
определяет тему, в которую коннектор должен отправлять выходные данные
В этом случае у нас также есть атрибут, специфичный для коннектора:
file
определяет файл, из которого коннектор должен считывать ввод
Чтобы это работало, давайте создадим базовый файл с некоторым содержимым:
echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt
Обратите внимание, что рабочий каталог — $CONFLUENT_HOME.
5.2. Конфигурация разъема приемника
Для нашего соединителя приемника мы будем использовать эталонную конфигурацию в $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Логически он содержит точно такие же параметры, хотя на этот раз коннектор.класс
указывает реализацию коннектора приемника, а файл
— это место, куда коннектор должен записывать содержимое.
5.3. Конфигурация рабочего
Наконец, нам нужно настроить обработчик Connect, который объединит два наших коннектора и выполнит работу по чтению из коннектора-источника и записи в коннектор-приемник.
Для этого мы можем использовать $CONFLUENT_HOME/etc/kafka/connect-standalone.properties
:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java
Обратите внимание, что plugin.path
может содержать список путей, по которым доступны реализации коннектора.
Поскольку мы будем использовать коннекторы в комплекте с Kafka, мы можем установить для plugin.path
значение $CONFLUENT_HOME/share/java
. При работе с Windows может потребоваться указать здесь абсолютный путь.
Для остальных параметров мы можем оставить значения по умолчанию:
bootstrap.servers
содержит адреса брокеров Kafkakey.converter
иvalue.converter
определяют классы-конвертеры, которые сериализуют и десериализуют данные по мере их поступления из источника в Kafka, а затем из Kafka в приемник.key.converter.schemas.enable
иvalue.converter.schemas.enable
— это настройки, специфичные для конвертера .offset.storage.file.filename
— самый важный параметр при работе Connect в автономном режиме: он определяет, где Connect должен хранить данные смещения.offset.flush.interval.ms
определяет интервал, с которым воркер пытается зафиксировать смещения для задач.
И список параметров довольно зрелый, поэтому ознакомьтесь с официальной документацией для получения полного списка.
5.4. Kafka Connect в автономном режиме
И с этим мы можем начать нашу первую настройку коннектора:
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
Во-первых, мы можем проверить содержимое темы с помощью командной строки:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Как мы видим, исходный коннектор взял данные из файла test.txt
, преобразовал их в JSON и отправил в Kafka:
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
И, если мы посмотрим на папку $CONFLUENT_HOME
, мы увидим, что здесь был создан файл test.sink.txt
:
cat $CONFLUENT_HOME/test.sink.txt
foo
bar
Поскольку коннектор приемника извлекает значение из атрибута полезной нагрузки
и записывает его в файл назначения, данные в test.sink.txt
содержат содержимое исходного файла test.txt
.
Теперь добавим больше строк в test.txt.
Когда мы это делаем, мы видим, что коннектор источника автоматически обнаруживает эти изменения.
Нам нужно только убедиться, что в конце вставлена новая строка, иначе исходный коннектор не будет учитывать последнюю строку.
На этом этапе давайте остановим процесс Connect, так как мы запустим Connect в распределенном режиме
несколькими строками.
6. REST API Connect
До сих пор мы выполняли все настройки, передавая файлы свойств через командную строку. Однако, поскольку Connect предназначен для работы в качестве службы, также доступен REST API.
По умолчанию он доступен по адресу http://localhost:8083
. Несколько конечных точек:
GET /connectors
— возвращает список всех используемых коннекторов.GET /connectors/{name}
— возвращает сведения о конкретном соединителе.POST /connectors
— создает новый коннектор; тело запроса должно быть объектом JSON, содержащим поле строкового имени и поле конфигурации объекта с параметрами конфигурации коннектора.GET /connectors/{name}/status
— возвращает текущий статус коннектора, в том числе если он запущен, сбой или приостановлен, к какому рабочему процессу он назначен, информацию об ошибке, если он вышел из строя, и состояние всех его задач.DELETE /connectors/{name}
— удаляет коннектор, изящно останавливая все задачи и удаляя его конфигурацию.GET /connector-plugins —
возвращает список плагинов коннекторов, установленных в кластере Kafka Connect.
Официальная документация предоставляет список со всеми конечными точками.
В следующем разделе мы будем использовать REST API для создания новых соединителей.
7. Kafka Connect в распределенном режиме
Автономный режим идеально подходит для разработки и тестирования, а также для небольших установок. Однако, если мы хотим в полной мере использовать распределенную природу Kafka, мы должны запустить Connect в распределенном режиме.
При этом настройки коннектора и метаданные сохраняются в темах Kafka, а не в файловой системе. В результате рабочие узлы действительно не имеют состояния.
7.1. Запуск подключения
Эталонную конфигурацию для распределенного режима можно найти в $CONFLUENT_HOME /etc/kafka/connect-distributed.properties.
Параметры в основном такие же, как и для автономного режима. Отличий всего несколько:
group.id
определяет имя кластерной группы Connect. Значение должно отличаться от любого идентификатора группы потребителей.offset.storage.topic
,config.storage.topic
иstatus.storage.topic
определяют темы для этих настроек. Для каждой темы мы также можем определить коэффициент репликации
Опять же, официальная документация предоставляет список со всеми параметрами.
Мы можем запустить Connect в распределенном режиме следующим образом:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
7.2. Добавление соединителей с помощью REST API
Теперь, по сравнению с автономной командой запуска, мы не передавали никаких конфигураций соединителя в качестве аргументов. Вместо этого мы должны создать коннекторы с помощью REST API.
Чтобы настроить наш предыдущий пример, мы должны отправить два запроса POST на адрес http://localhost:8083/connectors,
содержащие следующие структуры JSON.
Во-первых, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source.json
:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
Обратите внимание, как это выглядит очень похоже на эталонный файл конфигурации, который мы использовали в первый раз.
И затем мы POST это:
curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
Затем мы сделаем то же самое для соединителя приемника, вызвав файл connect-file-sink.json
:
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
И выполните POST, как раньше:
curl -d @$CONFLUENT_HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
При необходимости мы можем убедиться, что эта настройка работает правильно:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
И, если мы посмотрим на папку $CONFLUENT_HOME
, мы увидим, что здесь был создан файл test-distributed.sink.txt
:
cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar
После того, как мы протестировали распределенную установку, давайте очистим ее, удалив два соединителя:
curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink
8. Преобразование данных
8.1. Поддерживаемые преобразования
Преобразования позволяют нам вносить простые и легкие изменения в отдельные сообщения.
Kafka Connect поддерживает следующие встроенные преобразования:
InsertField
— добавьте поле, используя либо статические данные, либо метаданные записи.ReplaceField
— фильтровать или переименовывать поляMaskField
— замените поле допустимым нулевым значением для типа (например, нулем или пустой строкой).HoistField
— Оберните все событие как одно поле внутри структуры или карты.ExtractField
— извлечение определенного поля из структуры и карты и включение в результаты только этого поля.SetSchemaMetadata
— изменение имени или версии схемы.TimestampRouter
— измените тему записи на основе исходной темы и временной метки.RegexRouter
— изменение темы записи на основе исходной темы, строки замены и регулярного выражения.
Преобразование настраивается с использованием следующих параметров:
transforms
— разделенный запятыми список псевдонимов для преобразований.transforms.$alias.type
— Имя класса для преобразованияtransforms.$alias.$transformationSpecificConfig
— Конфигурация для соответствующего преобразования
8.2. Применение трансформатора
Чтобы протестировать некоторые функции преобразования, давайте настроим следующие два преобразования:
- Во-первых, давайте обернем все сообщение как структуру JSON.
- После этого добавим поле в эту структуру
Прежде чем применять наши преобразования, мы должны настроить Connect для использования JSON без схемы, изменив connect-distributed.properties
:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
После этого нам нужно перезапустить Connect, опять же в распределенном режиме:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
Опять же, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source-transform.json.
Помимо уже известных параметров, добавим несколько строк для двух необходимых преобразований:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
После этого давайте выполним POST:
curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
Давайте напишем несколько строк в наш test-transformation.txt
:
Foo
Bar
Если мы теперь проверим тему подключения-преобразования
, мы должны получить следующие строки:
{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}
9. Использование готовых коннекторов
После использования этих простых соединителей давайте посмотрим на более продвинутые, готовые к использованию соединители и способы их установки.
9.1. Где найти соединители
Готовые соединители доступны из разных источников:
Несколько соединителей связаны с простым Apache Kafka (источник и приемник для файлов и консоли).
Еще несколько коннекторов входят в состав Confluent Platform (ElasticSearch, HDFS, JDBC и AWS S3).
Также проверьте Confluent Hub , который является своего рода магазином приложений для коннекторов Kafka. Количество предлагаемых коннекторов постоянно растет:
Коннекторы Confluent (разработаны, протестированы, задокументированы и полностью поддерживаются Confluent)
Сертифицированные коннекторы (реализованы сторонней организацией и сертифицированы Confluent)
Коннекторы, разработанные и поддерживаемые сообществом
Кроме того, Confluent также предоставляет страницу соединителей с некоторыми соединителями, которые также доступны в концентраторе Confluent, а также с некоторыми другими соединителями сообщества.
И, наконец, есть производители, которые предоставляют коннекторы как часть своего продукта. Например, Landoop предоставляет потоковую библиотеку под названием Lenses , которая также содержит набор из примерно 25 коннекторов с открытым исходным кодом (многие из них также перечислены в других местах).
9.2. Установка коннекторов из Confluent Hub
Корпоративная версия Confluent предоставляет сценарий для установки соединителей и других компонентов из Confluent Hub (сценарий не включен в версию с открытым исходным кодом). Если мы используем корпоративную версию, мы можем установить коннектор с помощью следующей команды:
$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview
9.3. Установка коннекторов вручную
Если нам нужен коннектор, которого нет в Confluent Hub, или если у нас есть версия Confluent с открытым исходным кодом, мы можем установить необходимые коннекторы вручную. Для этого нам нужно скачать и разархивировать коннектор, а также переместить включенные библиотеки в папку, указанную как plugin.path.
Для каждого коннектора в архиве должны быть две интересующие нас папки:
- Папка
lib
содержит jar коннектора, например,kafka-connect-mqtt-1.0.0-preview.jar
, а также еще несколько jar, необходимых коннектору - Папка
etc
содержит один или несколько эталонных файлов конфигурации .
Мы должны переместить папку lib в
$CONFLUENT_HOME/share/java
или любой другой путь, который мы указали как plugin.path
в connect-standalone.properties
и connect-distributed.properties
. При этом также может иметь смысл переименовать папку во что-то осмысленное.
Мы можем использовать файлы конфигурации из etc
, либо ссылаясь на них при запуске в автономном режиме, либо мы можем просто взять свойства и создать из них файл JSON.
10. Заключение
В этом руководстве мы рассмотрели, как установить и использовать Kafka Connect.
Мы рассмотрели типы разъемов, как исток, так и сток. Мы также рассмотрели некоторые функции и режимы, в которых может работать Connect. Затем мы рассмотрели трансформеры. И, наконец, мы узнали, где взять и как установить пользовательские коннекторы.
Как всегда, файлы конфигурации можно найти на GitHub .