1. Обзор
Мониторинг управляемой событиями системы, использующей кластер Apache Kafka, часто требует от нас получения списка активных брокеров. В этом руководстве мы рассмотрим несколько команд оболочки, чтобы получить список активных брокеров в работающем кластере.
2. Настройка
Для целей этой статьи давайте воспользуемся приведенным ниже файлом docker-compose.yml
для настройки двухузлового кластера Kafka : ``
$ cat docker-compose.yml
---
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Теперь давайте раскрутим кластер Kafka с помощью команды docker-compose
:
$ docker-compose up -d
Мы можем убедиться, что сервер Zookeeper прослушивает порт 2181
, а брокеры Kafka прослушивают порты 29092
и 39092
соответственно:
$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!
3. Использование API-интерфейсов Zookeeper
В кластере Kafka сервер Zookeeper хранит метаданные, относящиеся к серверам брокера Kafka . Итак, давайте воспользуемся API-интерфейсами файловой системы, предоставляемыми Zookeeper, чтобы получить сведения о брокере.
3.1. Команда zookeeper-shell
Большинство дистрибутивов Kafka поставляются с бинарным файлом zookeeper-shell
или zookeeper-shell.sh
. Таким образом, использование этого двоичного файла для взаимодействия с сервером Zookeeper является стандартом де-факто.
Во-первых, давайте подключимся к серверу Zookeeper, работающему по адресу localhost:2181
:
$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
Как только мы подключимся к серверу Zookeeper, мы можем выполнить типичные команды файловой системы, такие как ls
, чтобы получить информацию о метаданных, хранящуюся на сервере. Давайте найдем идентификаторы брокеров, которые в настоящее время живы:
ls /brokers/ids
[1, 2]
Мы видим, что в настоящее время есть два активных брокера с идентификаторами 1 и 2. Используя команду get
, мы можем получить более подробную информацию для конкретного брокера с заданным идентификатором:
get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}
Обратите внимание, что посредник с id=1
прослушивает порт 29092
, а второй посредник с id=2
прослушивает порт 39092
.
Наконец, чтобы выйти из оболочки Zookeeper, мы можем использовать команду quit
:
quit
3.2. Команда zkCli
Так же, как дистрибутивы Kafka поставляются с бинарным файлом zookeeper-shell
, дистрибутивы Zookeeper поставляются с бинарным файлом zkCli
или zkCli.sh
.
Таким образом, взаимодействие с zkCli
точно такое же, как и с zookeeper-shell
, поэтому давайте продолжим и подтвердим, что мы можем получить необходимые данные для брокера с id=1
:
$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
Как и ожидалось, мы видим, что данные брокера, полученные с помощью zookeeper-shell
, совпадают с данными, полученными с помощью zkCli
.
4. Использование API версии брокера
Иногда у нас может быть неполный список активных брокеров, и мы хотим получить все доступные брокеры в кластере. В таком случае мы можем использовать команду kafka -broker-api-versions,
поставляемую с дистрибутивами Kafka.
Предположим, что мы знаем о брокере, работающем по адресу localhost:29092
, поэтому попробуем узнать всех активных брокеров, участвующих в кластере Kafka:
$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092
Стоит отметить, что мы использовали команду awk
для фильтрации вывода и отображения только адреса брокера. Далее результат правильно показывает, что в кластере два активных брокера.
Хотя этот подход выглядит проще, чем подход CLI Zookeeper, двоичный файл kafka-broker-api-versions
является лишь недавним дополнением к дистрибутиву Kafka.
5. Сценарий оболочки
В любом практическом сценарии выполнение команд zkCli
или zookeeper-shell
вручную для каждого брокера было бы утомительным . Итак, давайте напишем сценарий Shell , который принимает адрес сервера Zookeeper в качестве входных данных и взамен дает нам список всех активных брокеров.
5.1. Вспомогательные функции
Напишем все вспомогательные функции в `` скрипте functions.sh :
$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"
# Helper Functions Below
Во-первых, давайте напишем функцию get_broker_ids
, чтобы получить набор идентификаторов активных брокеров , которые будут вызывать команду zkCli
внутри:
function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}
Далее давайте напишем функцию get_broker_details
для получения подробной информации о брокере с использованием Broker_id
:
function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}
Теперь, когда у нас есть подробные сведения о брокере, давайте напишем функцию parse_broker_endpoint
, чтобы получить сведения о конечной точке брокера:
function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail" | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . | grep HOST | tr -d " ")"
}
Внутри мы использовали команду jq
для парсинга JSON . ``
5.2. Основной сценарий
Теперь давайте напишем основной скрипт get_all_active_brokers.sh
, который использует вспомогательные функции, определенные в functions.sh
:
$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"
function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
broker_details="$(get_broker_details $broker_id)"
broker_endpoint=$(parse_endpoint_detail "$broker_details")
echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}
get_all_active_brokers
Мы можем заметить, что мы перебрали все Broker_id
в функции get_all_active_brokers
, чтобы агрегировать конечные точки для всех активных брокеров.
Наконец, давайте выполним скрипт get_all_active_brokers.sh
, чтобы увидеть список активных брокеров для нашего двухузлового кластера Kafka:
$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"
Мы видим, что результаты точны. Похоже, нам это удалось!
6. Заключение
В этом руководстве мы узнали о командах оболочки, таких как zookeeper-shell
, zkCli
и kafka-broker-api-versions
, чтобы получить список активных брокеров в кластере Kafka. Кроме того, мы написали сценарий оболочки для автоматизации процесса поиска сведений о брокере в реальных сценариях.