Перейти к основному содержимому

Руководство по очистке темы Apache Kafka

· 4 мин. чтения

1. Обзор

В этой статье мы рассмотрим несколько стратегий очистки данных из раздела Apache Kafka .

2. Сценарий очистки

Прежде чем мы изучим стратегии очистки данных, давайте познакомимся с простым сценарием, требующим действия по очистке.

2.1. Сценарий

Сообщения в Apache Kafka автоматически удаляются после настроенного времени хранения . Тем не менее, в некоторых случаях мы можем захотеть, чтобы удаление сообщения произошло немедленно.

Давайте представим, что в код приложения, которое генерирует сообщения в топике Kafka, был внесен дефект. К тому времени, когда будет интегрировано исправление ошибки, у нас уже есть много поврежденных сообщений в теме Kafka , которые готовы к употреблению.

Такие проблемы чаще всего встречаются в среде разработки, и нам нужны быстрые результаты. Так что массовое удаление сообщений — дело разумное.

2.2. Моделирование

Чтобы смоделировать сценарий, давайте начнем с создания темы purge-scenario из каталога установки Kafka:

$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181

Далее воспользуемся командой shuf для генерации случайных данных и передачи их сценарию kafka-console-producer.sh :

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario

Мы должны отметить, что мы использовали команду tee для сохранения данных моделирования для последующего использования.

Наконец, давайте проверим, что потребитель может потреблять сообщения из темы:

$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages

3. Срок действия сообщения

Сообщения, созданные в теме purge-scenario , будут иметь срок хранения по умолчанию , равный семи дням. Чтобы очистить сообщения, мы можем временно сбросить свойство уровня темы Retention.ms на десять секунд и дождаться истечения срока действия сообщений:

$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10

Далее, давайте проверим, истек ли срок действия сообщений из темы:

$ bin/kafka-console-consumer.sh  \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

Наконец, мы можем восстановить исходный семидневный срок хранения темы:

$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario

При таком подходе Kafka будет удалять сообщения во всех разделах для темы purge-scenario .

4. Выборочное удаление записи

Иногда нам может понадобиться выборочно удалить записи в одном или нескольких разделах из определенной темы . Мы можем удовлетворить такие требования, используя скрипт kafka-delete-records.sh .

Во-первых, нам нужно указать смещение на уровне раздела в файле конфигурации delete-config.json .

Давайте очистим все сообщения от partition=1 , используя offset=-1 :

{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}

Далее приступим к удалению записи:

$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json

Мы можем убедиться, что мы все еще можем читать из partition=0 :

$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages

Однако, когда мы читаем из partition=1 , записей для обработки не будет:

$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

5. Удалите и заново создайте тему

Другой обходной путь для очистки всех сообщений темы Kafka — удалить и создать ее заново. Однако это возможно только в том случае, если мы установим для свойства delete.topic.enable значение true при запуске сервера Kafka : ** `` **

$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true

Чтобы удалить тему, мы можем использовать скрипт kafka-topics.sh :

$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Давайте проверим это, перечислив тему:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

Убедившись, что темы больше нет в списке, мы можем продолжить и воссоздать ее.

6. Заключение

В этом руководстве мы смоделировали сценарий, в котором нам нужно очистить тему Apache Kafka. Более того, мы рассмотрели несколько стратегий полной или выборочной очистки данных по разделам .