Перейти к основному содержимому

Получение списка активных брокеров в кластере Kafka с помощью команд оболочки

· 5 мин. чтения

1. Обзор

Мониторинг управляемой событиями системы, использующей кластер Apache Kafka, часто требует от нас получения списка активных брокеров. В этом руководстве мы рассмотрим несколько команд оболочки, чтобы получить список активных брокеров в работающем кластере.

2. Настройка

Для целей этой статьи давайте воспользуемся приведенным ниже файлом docker-compose.yml для настройки двухузлового кластера Kafka : ``

$ cat docker-compose.yml
---
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181

kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Теперь давайте раскрутим кластер Kafka с помощью команды docker-compose :

$ docker-compose up -d

Мы можем убедиться, что сервер Zookeeper прослушивает порт 2181 , а брокеры Kafka прослушивают порты 29092 и 39092 соответственно:

$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!

3. Использование API-интерфейсов Zookeeper

В кластере Kafka сервер Zookeeper хранит метаданные, относящиеся к серверам брокера Kafka . Итак, давайте воспользуемся API-интерфейсами файловой системы, предоставляемыми Zookeeper, чтобы получить сведения о брокере.

3.1. Команда zookeeper-shell

Большинство дистрибутивов Kafka поставляются с бинарным файлом zookeeper-shell или zookeeper-shell.sh . Таким образом, использование этого двоичного файла для взаимодействия с сервером Zookeeper является стандартом де-факто.

Во-первых, давайте подключимся к серверу Zookeeper, работающему по адресу localhost:2181 :

$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!

Как только мы подключимся к серверу Zookeeper, мы можем выполнить типичные команды файловой системы, такие как ls , чтобы получить информацию о метаданных, хранящуюся на сервере. Давайте найдем идентификаторы брокеров, которые в настоящее время живы:

ls /brokers/ids
[1, 2]

Мы видим, что в настоящее время есть два активных брокера с идентификаторами 1 и 2. Используя команду get , мы можем получить более подробную информацию для конкретного брокера с заданным идентификатором:

get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}

Обратите внимание, что посредник с id=1 прослушивает порт 29092 , а второй посредник с id=2 прослушивает порт 39092 .

Наконец, чтобы выйти из оболочки Zookeeper, мы можем использовать команду quit :

quit

3.2. Команда zkCli

Так же, как дистрибутивы Kafka поставляются с бинарным файлом zookeeper-shell , дистрибутивы Zookeeper поставляются с бинарным файлом zkCli или zkCli.sh .

Таким образом, взаимодействие с zkCli точно такое же, как и с zookeeper-shell , поэтому давайте продолжим и подтвердим, что мы можем получить необходимые данные для брокера с id=1 :

$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}

Как и ожидалось, мы видим, что данные брокера, полученные с помощью zookeeper-shell , совпадают с данными, полученными с помощью zkCli .

4. Использование API версии брокера

Иногда у нас может быть неполный список активных брокеров, и мы хотим получить все доступные брокеры в кластере. В таком случае мы можем использовать команду kafka -broker-api-versions, поставляемую с дистрибутивами Kafka.

Предположим, что мы знаем о брокере, работающем по адресу localhost:29092 , поэтому попробуем узнать всех активных брокеров, участвующих в кластере Kafka:

$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092

Стоит отметить, что мы использовали команду awk для фильтрации вывода и отображения только адреса брокера. Далее результат правильно показывает, что в кластере два активных брокера.

Хотя этот подход выглядит проще, чем подход CLI Zookeeper, двоичный файл kafka-broker-api-versions является лишь недавним дополнением к дистрибутиву Kafka.

5. Сценарий оболочки

В любом практическом сценарии выполнение команд zkCli или zookeeper-shell вручную для каждого брокера было бы утомительным . Итак, давайте напишем сценарий Shell , который принимает адрес сервера Zookeeper в качестве входных данных и взамен дает нам список всех активных брокеров.

5.1. Вспомогательные функции

Напишем все вспомогательные функции в `` скрипте functions.sh :

$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"

# Helper Functions Below

Во-первых, давайте напишем функцию get_broker_ids , чтобы получить набор идентификаторов активных брокеров , которые будут вызывать команду zkCli внутри:

function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}

Далее давайте напишем функцию get_broker_details для получения подробной информации о брокере с использованием Broker_id :

function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}

Теперь, когда у нас есть подробные сведения о брокере, давайте напишем функцию parse_broker_endpoint , чтобы получить сведения о конечной точке брокера:

function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail" | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . | grep HOST | tr -d " ")"
}

Внутри мы использовали команду jq для парсинга JSON . ``

5.2. Основной сценарий

Теперь давайте напишем основной скрипт get_all_active_brokers.sh , который использует вспомогательные функции, определенные в functions.sh :

$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"

function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
broker_details="$(get_broker_details $broker_id)"
broker_endpoint=$(parse_endpoint_detail "$broker_details")
echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}

get_all_active_brokers

Мы можем заметить, что мы перебрали все Broker_id в функции get_all_active_brokers , чтобы агрегировать конечные точки для всех активных брокеров.

Наконец, давайте выполним скрипт get_all_active_brokers.sh , чтобы увидеть список активных брокеров для нашего двухузлового кластера Kafka:

$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"

Мы видим, что результаты точны. Похоже, нам это удалось!

6. Заключение

В этом руководстве мы узнали о командах оболочки, таких как zookeeper-shell , zkCli и kafka-broker-api-versions , чтобы получить список активных брокеров в кластере Kafka. Кроме того, мы написали сценарий оболочки для автоматизации процесса поиска сведений о брокере в реальных сценариях.