Перейти к основному содержимому

Настройка периода хранения сообщений в Apache Kafka

· 5 мин. чтения

1. Обзор

Когда производитель отправляет сообщение в Apache Kafka, он добавляет его в файл журнала и сохраняет в течение настроенного периода времени.

В этом руководстве мы научимся настраивать свойства хранения сообщений на основе времени для тем Kafka .

2. Удержание на основе времени

При заданных свойствах срока хранения сообщения имеют TTL (время жизни) . По истечении срока действия сообщения помечаются для удаления, тем самым освобождая место на диске.

Одно и то же свойство срока хранения применяется ко всем сообщениям в заданной теме Kafka. Кроме того, мы можем установить эти свойства либо перед созданием темы, либо изменить их во время выполнения для уже существующей темы .

В следующих разделах мы узнаем, как настроить это с помощью конфигурации брокера для установки периода хранения для новых тем и конфигурации на уровне темы для управления им во время выполнения .

3. Конфигурация на уровне сервера

Apache Kafka поддерживает политику хранения на уровне сервера, которую мы можем настроить, настроив ровно одно из трех свойств конфигурации на основе времени :

  • log.retention.hours
  • лог.удержания.минут
  • log.retention.ms

Важно понимать, что Kafka заменяет значение с меньшей точностью более высоким. Таким образом, log.retention.ms будет иметь наивысший приоритет .

3.1. Основы

Во-первых, давайте проверим значение по умолчанию для хранения, выполнив команду grep из каталога Apache Kafka :

$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168

Здесь мы можем заметить, что срок хранения по умолчанию составляет семь дней .

Чтобы сообщения сохранялись только в течение десяти минут, мы можем установить значение свойства log.retention.minutes в config/server.properties :

log.retention.minutes=10

3.2. Срок хранения для новой темы

Пакет Apache Kafka содержит несколько сценариев оболочки, которые мы можем использовать для выполнения административных задач. Мы будем использовать их для создания вспомогательного скрипта functions.sh , который мы будем использовать в ходе этого руководства .

Начнем с добавления двух функций в functions.sh для создания топика и описания его настройки соответственно:

function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}

function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}

Далее давайте создадим два автономных скрипта, create-topic.sh и get-topic-retention-time.sh :

bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?

Мы должны отметить, что description_topic_config предоставит все свойства, настроенные для темы. Итак, мы использовали однострочник awk , чтобы добавить фильтр для свойства Retention.ms .

Наконец, давайте запустим среду Kafka и проверим конфигурацию периода хранения для нового примера темы:

bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000

После того, как тема будет создана и описана, мы заметим, что для параметраtention.ms задано значение 600000 (десять минут). На самом деле это производное от свойства log.retention.minutes , которое мы ранее определили в файле server.properties .

4. Конфигурация на уровне темы

После запуска сервера Broker свойства log.retention.{hours|minutes|ms} на уровне сервера становятся доступными только для чтения . С другой стороны, мы получаем доступ к свойству Retention.ms , которое мы можем настроить на уровне темы.

Давайте добавим в наш скрипт functions.sh метод для настройки свойства топика:

function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}

Затем мы можем использовать это в скрипте alter-topic-config.sh :

#!/bin/sh
. ./functions.sh

alter_topic_retention_config $1 $2 $3
exit $?

Наконец, давайте установим время хранения на пять минут для тестовой темы и проверим то же самое:

bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.

bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000

5. Проверка

До сих пор мы видели, как можно настроить срок хранения сообщения в теме Kafka. Пришло время убедиться, что срок действия сообщения действительно истекает после истечения срока хранения.

5.1. Производитель-Потребитель

Давайте добавим функции product_message и Consumer_message в файл functions.sh. Внутри они используют kafka-console-producer.sh и kafka-console-consumer.sh соответственно для создания/потребления сообщения:

function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}

function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}

Мы должны отметить, что потребитель всегда читает сообщения с самого начала , так как нам нужен потребитель, который читает любое доступное сообщение в Kafka .

Далее давайте создадим автономного производителя сообщений:

bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"

produce_message ${topic_name} ${message}
exit $?

Наконец, у нас есть автономный потребитель сообщений:

bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"

consume_message ${topic_name} $timeout
exit $?

5.2. Срок действия сообщения

Теперь, когда у нас есть готовые базовые настройки, давайте создадим одно сообщение и мгновенно воспользуемся им дважды:

bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages

Итак, мы видим, что потребитель повторно использует любое доступное сообщение.

Теперь давайте введем задержку сна на пять минут, а затем попытаемся обработать сообщение:

bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

Как и ожидалось, потребитель не нашел ни одного сообщения для использования, поскольку срок хранения сообщения истек .

6. Ограничения

Внутри Kafka Broker поддерживает другое свойство, называемое log.retention.check.interval.ms. Это свойство определяет частоту, с которой сообщения проверяются на истечение срока действия.

Таким образом, чтобы сохранить эффективность политики хранения, мы должны убедиться, что значение log.retention.check.interval.ms ниже, чем значение свойства Retention.ms для любой заданной темы.

7. Заключение

В этом руководстве мы изучили Apache Kafka, чтобы понять политику хранения сообщений на основе времени . В процессе мы создали простые сценарии оболочки, чтобы упростить административную деятельность. Позже мы создали автономный потребитель и производитель для проверки истечения срока действия сообщения после периода хранения.