Перейти к основному содержимому

Введение в коннекторы Kafka

· 12 мин. чтения

1. Обзор

Apache Kafka® — это распределенная платформа потоковой передачи. В предыдущем уроке мы обсуждали, как реализовать потребителей и производителей Kafka с помощью Spring .

В этом руководстве мы узнаем, как использовать коннекторы Kafka.

Мы рассмотрим:

  • Различные типы коннекторов Kafka
  • Функции и режимы Kafka Connect
  • Конфигурация соединителей с использованием файлов свойств, а также REST API

2. Основы Kafka Connect и Kafka Connectors

Kafka Connect — это фреймворк для подключения Kafka к внешним системам , таким как базы данных, хранилища ключей и значений, поисковые индексы и файловые системы, с помощью так называемых коннекторов .

Kafka Connectors — это готовые к использованию компоненты, которые могут помочь нам импортировать данные из внешних систем в темы Kafka и экспортировать данные из тем Kafka во внешние системы . Мы можем использовать существующие реализации соединителей для общих источников данных и приемников или реализовать собственные соединители.

Исходный коннектор собирает данные из системы. Исходные системы могут быть целыми базами данных, таблицами потоков или брокерами сообщений. Исходный коннектор также может собирать метрики с серверов приложений в темы Kafka, делая данные доступными для потоковой обработки с малой задержкой.

Соединитель приемника доставляет данные из разделов Kafka в другие системы, которые могут быть индексами, такими как Elasticsearch, пакетными системами, такими как Hadoop, или любой другой базой данных.

Некоторые соединители поддерживаются сообществом, а другие поддерживаются Confluent или его партнерами. Действительно, мы можем найти разъемы для большинства популярных систем, таких как S3, JDBC и Cassandra, и это лишь некоторые из них.

3. Особенности

Возможности Kafka Connect включают в себя:

  • Фреймворк для подключения внешних систем к Kafka — упрощает разработку, развертывание и управление коннекторами.
  • Распределенный и автономный режимы — это помогает нам развертывать большие кластеры, используя распределенный характер Kafka, а также настройки для разработки, тестирования и небольших производственных развертываний.
  • Интерфейс REST — мы можем управлять соединителями с помощью REST API.
  • Автоматическое управление смещением — Kafka Connect помогает нам обрабатывать процесс фиксации смещения, что избавляет нас от необходимости вручную реализовывать эту подверженную ошибкам часть разработки коннектора.
  • Распределенная и масштабируемая по умолчанию — Kafka Connect использует существующий протокол управления группами; мы можем добавить больше рабочих для масштабирования кластера Kafka Connect.
  • Потоковая и пакетная интеграция — Kafka Connect — идеальное решение для объединения систем потоковой и пакетной передачи данных в сочетании с существующими возможностями Kafka.
  • Преобразования — они позволяют нам вносить простые и легкие изменения в отдельные сообщения.

4. Настройка

Вместо использования простого дистрибутива Kafka мы загрузим Confluent Platform, дистрибутив Kafka, предоставленный Confluent, Inc., компанией, стоящей за Kafka. Confluent Platform поставляется с некоторыми дополнительными инструментами и клиентами по сравнению с простой Kafka, а также некоторыми дополнительными готовыми соединителями.

Для нашего случая достаточно версии с открытым исходным кодом, которую можно найти на сайте Confluent .

5. Быстрый старт Kafka Connect

Для начала мы обсудим принцип Kafka Connect, используя его самые основные соединители, которые являются соединителем источника файла и соединителем приемника файла .

Удобно, что Confluent Platform поставляется с обоими этими коннекторами, а также с эталонными конфигурациями.

5.1. Конфигурация исходного коннектора

Для исходного соединителя эталонная конфигурация доступна по адресу $CONFLUENT_HOME/etc/kafka/connect-file-source.properties :

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

Эта конфигурация имеет некоторые свойства, общие для всех исходных соединителей:

  • имя — указанное пользователем имя экземпляра соединителя.
  • Connector.class указывает класс реализации, в основном вид соединителя
  • tasks.max указывает, сколько экземпляров нашего исходного коннектора должно работать параллельно, и
  • тема определяет тему, в которую коннектор должен отправлять выходные данные

В этом случае у нас также есть атрибут, специфичный для коннектора:

  • file определяет файл, из которого коннектор должен считывать ввод

Чтобы это работало, давайте создадим базовый файл с некоторым содержимым:

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

Обратите внимание, что рабочий каталог — $CONFLUENT_HOME.

5.2. Конфигурация разъема приемника

Для нашего соединителя приемника мы будем использовать эталонную конфигурацию в $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties :

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Логически он содержит точно такие же параметры, хотя на этот раз коннектор.класс указывает реализацию коннектора приемника, а файл — это место, куда коннектор должен записывать содержимое.

5.3. Конфигурация рабочего

Наконец, нам нужно настроить обработчик Connect, который объединит два наших коннектора и выполнит работу по чтению из коннектора-источника и записи в коннектор-приемник.

Для этого мы можем использовать $CONFLUENT_HOME/etc/kafka/connect-standalone.properties :

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java

Обратите внимание, что plugin.path может содержать список путей, по которым доступны реализации коннектора.

Поскольку мы будем использовать коннекторы в комплекте с Kafka, мы можем установить для plugin.path значение $CONFLUENT_HOME/share/java . При работе с Windows может потребоваться указать здесь абсолютный путь.

Для остальных параметров мы можем оставить значения по умолчанию:

  • bootstrap.servers содержит адреса брокеров Kafka
  • key.converter и value.converter определяют классы-конвертеры, которые сериализуют и десериализуют данные по мере их поступления из источника в Kafka, а затем из Kafka в приемник.
  • key.converter.schemas.enable и value.converter.schemas.enable — это настройки, специфичные для конвертера .
  • offset.storage.file.filename — самый важный параметр при работе Connect в автономном режиме: он определяет, где Connect должен хранить данные смещения.
  • offset.flush.interval.ms определяет интервал, с которым воркер пытается зафиксировать смещения для задач.

И список параметров довольно зрелый, поэтому ознакомьтесь с официальной документацией для получения полного списка.

5.4. Kafka Connect в автономном режиме

И с этим мы можем начать нашу первую настройку коннектора:

$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

Во-первых, мы можем проверить содержимое темы с помощью командной строки:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Как мы видим, исходный коннектор взял данные из файла test.txt , преобразовал их в JSON и отправил в Kafka:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

И, если мы посмотрим на папку $CONFLUENT_HOME , мы увидим, что здесь был создан файл test.sink.txt :

cat $CONFLUENT_HOME/test.sink.txt
foo
bar

Поскольку коннектор приемника извлекает значение из атрибута полезной нагрузки и записывает его в файл назначения, данные в test.sink.txt содержат содержимое исходного файла test.txt .

Теперь добавим больше строк в test.txt.

Когда мы это делаем, мы видим, что коннектор источника автоматически обнаруживает эти изменения.

Нам нужно только убедиться, что в конце вставлена новая строка, иначе исходный коннектор не будет учитывать последнюю строку.

На этом этапе давайте остановим процесс Connect, так как мы запустим Connect в распределенном режиме несколькими строками.

6. REST API Connect

До сих пор мы выполняли все настройки, передавая файлы свойств через командную строку. Однако, поскольку Connect предназначен для работы в качестве службы, также доступен REST API.

По умолчанию он доступен по адресу http://localhost:8083 . Несколько конечных точек:

  • GET /connectors — возвращает список всех используемых коннекторов.
  • GET /connectors/{name} — возвращает сведения о конкретном соединителе.
  • POST /connectors — создает новый коннектор; тело запроса должно быть объектом JSON, содержащим поле строкового имени и поле конфигурации объекта с параметрами конфигурации коннектора.
  • GET /connectors/{name}/status — возвращает текущий статус коннектора, в том числе если он запущен, сбой или приостановлен, к какому рабочему процессу он назначен, информацию об ошибке, если он вышел из строя, и состояние всех его задач.
  • DELETE /connectors/{name} — удаляет коннектор, изящно останавливая все задачи и удаляя его конфигурацию.
  • GET /connector-plugins — возвращает список плагинов коннекторов, установленных в кластере Kafka Connect.

Официальная документация предоставляет список со всеми конечными точками.

В следующем разделе мы будем использовать REST API для создания новых соединителей.

7. Kafka Connect в распределенном режиме

Автономный режим идеально подходит для разработки и тестирования, а также для небольших установок. Однако, если мы хотим в полной мере использовать распределенную природу Kafka, мы должны запустить Connect в распределенном режиме.

При этом настройки коннектора и метаданные сохраняются в темах Kafka, а не в файловой системе. В результате рабочие узлы действительно не имеют состояния.

7.1. Запуск подключения

Эталонную конфигурацию для распределенного режима можно найти в $CONFLUENT_HOME /etc/kafka/connect-distributed.properties.

Параметры в основном такие же, как и для автономного режима. Отличий всего несколько:

  • group.id определяет имя кластерной группы Connect. Значение должно отличаться от любого идентификатора группы потребителей.
  • offset.storage.topic , config.storage.topic и status.storage.topic определяют темы для этих настроек. Для каждой темы мы также можем определить коэффициент репликации

Опять же, официальная документация предоставляет список со всеми параметрами.

Мы можем запустить Connect в распределенном режиме следующим образом:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

7.2. Добавление соединителей с помощью REST API

Теперь, по сравнению с автономной командой запуска, мы не передавали никаких конфигураций соединителя в качестве аргументов. Вместо этого мы должны создать коннекторы с помощью REST API.

Чтобы настроить наш предыдущий пример, мы должны отправить два запроса POST на адрес http://localhost:8083/connectors, содержащие следующие структуры JSON.

Во-первых, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source.json :

{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}

Обратите внимание, как это выглядит очень похоже на эталонный файл конфигурации, который мы использовали в первый раз.

И затем мы POST это:

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors

Затем мы сделаем то же самое для соединителя приемника, вызвав файл connect-file-sink.json :

{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}

И выполните POST, как раньше:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors

При необходимости мы можем убедиться, что эта настройка работает правильно:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

И, если мы посмотрим на папку $CONFLUENT_HOME , мы увидим, что здесь был создан файл test-distributed.sink.txt :

cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar

После того, как мы протестировали распределенную установку, давайте очистим ее, удалив два соединителя:

curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

8. Преобразование данных

8.1. Поддерживаемые преобразования

Преобразования позволяют нам вносить простые и легкие изменения в отдельные сообщения.

Kafka Connect поддерживает следующие встроенные преобразования:

  • InsertField — добавьте поле, используя либо статические данные, либо метаданные записи.
  • ReplaceField — фильтровать или переименовывать поля
  • MaskField — замените поле допустимым нулевым значением для типа (например, нулем или пустой строкой).
  • HoistField — Оберните все событие как одно поле внутри структуры или карты.
  • ExtractField — извлечение определенного поля из структуры и карты и включение в результаты только этого поля.
  • SetSchemaMetadata — изменение имени или версии схемы.
  • TimestampRouter — измените тему записи на основе исходной темы и временной метки.
  • RegexRouter — изменение темы записи на основе исходной темы, строки замены и регулярного выражения.

Преобразование настраивается с использованием следующих параметров:

  • transforms — разделенный запятыми список псевдонимов для преобразований.
  • transforms.$alias.type — Имя класса для преобразования
  • transforms.$alias.$transformationSpecificConfig — Конфигурация для соответствующего преобразования

8.2. Применение трансформатора

Чтобы протестировать некоторые функции преобразования, давайте настроим следующие два преобразования:

  • Во-первых, давайте обернем все сообщение как структуру JSON.
  • После этого добавим поле в эту структуру

Прежде чем применять наши преобразования, мы должны настроить Connect для использования JSON без схемы, изменив connect-distributed.properties :

key.converter.schemas.enable=false
value.converter.schemas.enable=false

После этого нам нужно перезапустить Connect, опять же в распределенном режиме:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

Опять же, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source-transform.json.

Помимо уже известных параметров, добавим несколько строк для двух необходимых преобразований:

{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}

После этого давайте выполним POST:

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors

Давайте напишем несколько строк в наш test-transformation.txt :

Foo
Bar

Если мы теперь проверим тему подключения-преобразования , мы должны получить следующие строки:

{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}

9. Использование готовых коннекторов

После использования этих простых соединителей давайте посмотрим на более продвинутые, готовые к использованию соединители и способы их установки.

9.1. Где найти соединители

Готовые соединители доступны из разных источников:

  • Несколько соединителей связаны с простым Apache Kafka (источник и приемник для файлов и консоли).

  • Еще несколько коннекторов входят в состав Confluent Platform (ElasticSearch, HDFS, JDBC и AWS S3).

  • Также проверьте Confluent Hub , который является своего рода магазином приложений для коннекторов Kafka. Количество предлагаемых коннекторов постоянно растет:

  • Коннекторы Confluent (разработаны, протестированы, задокументированы и полностью поддерживаются Confluent)

  • Сертифицированные коннекторы (реализованы сторонней организацией и сертифицированы Confluent)

  • Коннекторы, разработанные и поддерживаемые сообществом

  • Кроме того, Confluent также предоставляет страницу соединителей с некоторыми соединителями, которые также доступны в концентраторе Confluent, а также с некоторыми другими соединителями сообщества.

  • И, наконец, есть производители, которые предоставляют коннекторы как часть своего продукта. Например, Landoop предоставляет потоковую библиотеку под названием Lenses , которая также содержит набор из примерно 25 коннекторов с открытым исходным кодом (многие из них также перечислены в других местах).

9.2. Установка коннекторов из Confluent Hub

Корпоративная версия Confluent предоставляет сценарий для установки соединителей и других компонентов из Confluent Hub (сценарий не включен в версию с открытым исходным кодом). Если мы используем корпоративную версию, мы можем установить коннектор с помощью следующей команды:

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

9.3. Установка коннекторов вручную

Если нам нужен коннектор, которого нет в Confluent Hub, или если у нас есть версия Confluent с открытым исходным кодом, мы можем установить необходимые коннекторы вручную. Для этого нам нужно скачать и разархивировать коннектор, а также переместить включенные библиотеки в папку, указанную как plugin.path.

Для каждого коннектора в архиве должны быть две интересующие нас папки:

  • Папка lib содержит jar коннектора, например, kafka-connect-mqtt-1.0.0-preview.jar , а также еще несколько jar, необходимых коннектору
  • Папка etc содержит один или несколько эталонных файлов конфигурации .

Мы должны переместить папку lib в $CONFLUENT_HOME/share/java или любой другой путь, который мы указали как plugin.path в connect-standalone.properties и connect-distributed.properties . При этом также может иметь смысл переименовать папку во что-то осмысленное.

Мы можем использовать файлы конфигурации из etc , либо ссылаясь на них при запуске в автономном режиме, либо мы можем просто взять свойства и создать из них файл JSON.

10. Заключение

В этом руководстве мы рассмотрели, как установить и использовать Kafka Connect.

Мы рассмотрели типы разъемов, как исток, так и сток. Мы также рассмотрели некоторые функции и режимы, в которых может работать Connect. Затем мы рассмотрели трансформеры. И, наконец, мы узнали, где взять и как установить пользовательские коннекторы.

Как всегда, файлы конфигурации можно найти на GitHub .