1. Обзор
В этой статье мы рассмотрим несколько стратегий очистки данных из раздела Apache Kafka .
2. Сценарий очистки
Прежде чем мы изучим стратегии очистки данных, давайте познакомимся с простым сценарием, требующим действия по очистке.
2.1. Сценарий
Сообщения в Apache Kafka автоматически удаляются после настроенного времени хранения . Тем не менее, в некоторых случаях мы можем захотеть, чтобы удаление сообщения произошло немедленно.
Давайте представим, что в код приложения, которое генерирует сообщения в топике Kafka, был внесен дефект. К тому времени, когда будет интегрировано исправление ошибки, у нас уже есть много поврежденных сообщений в теме Kafka , которые готовы к употреблению.
Такие проблемы чаще всего встречаются в среде разработки, и нам нужны быстрые результаты. Так что массовое удаление сообщений — дело разумное.
2.2. Моделирование
Чтобы смоделировать сценарий, давайте начнем с создания темы purge-scenario
из каталога установки Kafka:
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
Далее воспользуемся командой shuf
для генерации случайных данных и передачи их сценарию kafka-console-producer.sh
:
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
Мы должны отметить, что мы использовали команду tee
для сохранения данных моделирования для последующего использования.
Наконец, давайте проверим, что потребитель может потреблять сообщения из темы:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
3. Срок действия сообщения
Сообщения, созданные в теме purge-scenario
, будут иметь срок хранения по умолчанию , равный семи дням. Чтобы очистить сообщения, мы можем временно сбросить свойство уровня темы Retention.ms на
десять секунд и дождаться истечения срока действия сообщений:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
Далее, давайте проверим, истек ли срок действия сообщений из темы:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
Наконец, мы можем восстановить исходный семидневный срок хранения темы:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
При таком подходе Kafka будет удалять сообщения во всех разделах для темы purge-scenario .
4. Выборочное удаление записи
Иногда нам может понадобиться выборочно удалить записи в одном или нескольких разделах из определенной темы . Мы можем удовлетворить такие требования, используя скрипт kafka-delete-records.sh
.
Во-первых, нам нужно указать смещение на уровне раздела в файле конфигурации delete-config.json
.
Давайте очистим все сообщения от partition=1
, используя offset=-1
:
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
Далее приступим к удалению записи:
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
Мы можем убедиться, что мы все еще можем читать из partition=0
:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
Однако, когда мы читаем из partition=1
, записей для обработки не будет:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
5. Удалите и заново создайте тему
Другой обходной путь для очистки всех сообщений темы Kafka — удалить и создать ее заново. Однако это возможно только в том случае, если мы установим для свойства delete.topic.enable
значение true
при запуске сервера Kafka : ** `` **
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
Чтобы удалить тему, мы можем использовать скрипт kafka-topics.sh
:
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Давайте проверим это, перечислив тему:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
Убедившись, что темы больше нет в списке, мы можем продолжить и воссоздать ее.
6. Заключение
В этом руководстве мы смоделировали сценарий, в котором нам нужно очистить тему Apache Kafka. Более того, мы рассмотрели несколько стратегий полной или выборочной очистки данных по разделам .