Перейти к основному содержимому

Отправляйте большие сообщения с помощью Kafka

· 6 мин. чтения

1. Обзор

Apache Kafka — это мощная распределенная отказоустойчивая платформа для потоковой передачи событий с открытым исходным кодом. Однако, когда мы используем Kafka для отправки сообщений, размер которых превышает установленный лимит, возникает ошибка.

Мы показали , как работать со Spring и Kafka в предыдущем уроке. В этом уроке мы рассмотрим способ отправки больших сообщений с помощью Kafka.

2. Постановка задачи

Конфигурация Kafka ограничивает размер сообщений, которые разрешено отправлять. По умолчанию это ограничение составляет 1 МБ. Однако, если требуется отправлять большие сообщения, нам нужно настроить эти конфигурации в соответствии с нашими требованиями.

В этом уроке мы используем Kafka v2.5. Давайте сначала посмотрим на нашу настройку Kafka, прежде чем переходить к настройке.

3. Настройка

Здесь мы собираемся использовать базовую настройку Kafka с одним брокером. Кроме того, приложение-производитель может отправлять сообщения по определенной теме в Kafka Broker с помощью клиента Kafka. Кроме того, мы используем тему с одним разделом:

./8dbca5ed4e1389ef789bb1c35b668424.png

Здесь мы можем наблюдать несколько точек взаимодействия, таких как Kafka Producer, Kafka Broker, Topic и Kafka Consumer. Следовательно, все они нуждаются в обновлениях конфигурации, чтобы иметь возможность отправлять большие сообщения с одного конца на другой .

Давайте подробно рассмотрим эти конфиги, чтобы отправить большое сообщение размером 20 МБ.

3. Конфигурация производителя Kafka

Это первое место, откуда исходит наше сообщение. И мы используем Spring Kafka для отправки сообщений из нашего приложения на сервер Kafka.

Следовательно, сначала необходимо обновить свойство « max.request.size » . Дополнительные подробности об этом конфиге производителя доступны в Kafka Documentation . Это доступно как константа ProducerConfig.MAX_REQUEST_SIZE_CONFIG в клиентской библиотеке Kafka, которая доступна как часть зависимости Spring Kafka.

Давайте настроим это значение на 20971520 байт:

public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

return new DefaultKafkaProducerFactory<>(configProps);
}

4. Конфигурация темы Кафки

Наше приложение для создания сообщений отправляет сообщения в Kafka Broker по определенной теме. Следовательно, следующим требованием является настройка используемой темы Kafka. Это означает, что нам нужно обновить свойство max.message.bytes со значением по умолчанию 1 МБ.

Он содержит значение наибольшего размера пакета записи Kafka после сжатия (если сжатие включено). Дополнительные подробности доступны в Kafka Documentation .

Настроим это свойство вручную во время создания топика с помощью команды CLI:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520

Кроме того, мы можем настроить это свойство через клиент Kafka:

public NewTopic topic() {
NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "20971520");
newTopic.configs(configs);
return newTopic;
}

Как минимум, нам нужно настроить эти два свойства.

5. Конфигурация брокера Kafka

Необязательное свойство конфигурации « message.max.bytes » может использоваться, чтобы разрешить всем темам на брокере принимать сообщения размером более 1 МБ.

И это содержит значение наибольшего размера пакета записи, разрешенного Kafka после сжатия (если сжатие включено). Дополнительные подробности доступны в Kafka Documentation .

Давайте добавим это свойство в конфигурационный файл Kafka Broker server.properties :

message.max.bytes=20971520

Кроме того, максимальное значение между « message.max.bytes» и « max.message.bytes » будет эффективным используемым значением.

6. Конфигурация потребителя

Давайте рассмотрим параметры конфигурации, доступные для потребителя Kafka. Хотя эти изменения не являются обязательными для обработки больших сообщений, отказ от них может повлиять на производительность приложения-потребителя. Следовательно, хорошо иметь и эти конфиги:

  • max.partition.fetch.bytes : это свойство ограничивает количество байтов, которое потребитель может получить из раздела темы. Дополнительные подробности доступны в Kafka Documentation . Это доступно как константа с именем ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG `` в клиентской библиотеке Kafka.
  • fetch.max.bytes : это свойство ограничивает количество байтов, которые потребитель может получить с самого сервера Kafka. Потребитель Kafka также может прослушивать несколько разделов. Дополнительные подробности доступны в Kafka Documentation . Это доступно как константа ConsumerConfig.FETCH_MAX_BYTES_CONFIG в клиентской библиотеке Kafka.

Поэтому для настройки наших потребителей нам нужно создать KafkaConsumerFactory . Помните, что нам всегда нужно использовать более высокое значение по сравнению с конфигурацией темы/брокера :

public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}

Здесь мы использовали одно и то же значение конфигурации 20971520 байт для обоих свойств, потому что мы используем один раздел Topic . Однако значение FETCH_MAX_BYTES_CONFIG должно быть больше, чем MAX_PARTITION_FETCH_BYTES_CONFIG. Когда у нас есть потребитель, прослушивающий несколько разделов, FETCH_MAX_BYTES_CONFIG представляет размер сообщения, которое может быть получено из нескольких разделов. С другой стороны, конфигурация MAX_PARTITION_FETCH_BYTES_CONFIG представляет размер выборки сообщений из одного раздела.

7. Альтернативы

Мы увидели, как различные конфигурации в Kafka Producer, Topic, Broker и Kafka Consumer могут быть обновлены для отправки больших сообщений. Однако в целом нам следует избегать отправки больших сообщений с помощью Kafka. Обработка больших сообщений потребляет больше процессора и памяти нашего производителя и потребителя. Следовательно, в конечном итоге несколько ограничивает их возможности обработки для других задач. Кроме того, это может вызвать заметно большую задержку для конечного пользователя.

Рассмотрим другие возможные варианты:

  1. Производитель Kafka предоставляет функцию сжатия сообщений. Кроме того, он поддерживает различные типы сжатия, которые мы можем настроить с помощью свойства Compression.type .
  2. Мы можем хранить большие сообщения в файле в общем хранилище и отправлять его через сообщение Kafka. Это может быть более быстрый вариант с минимальными затратами на обработку.
  3. Другим вариантом может быть разделение большого сообщения на маленькие сообщения размером 1 КБ каждое на стороне производителя. После этого мы можем отправить все эти сообщения в один раздел, используя ключ раздела, чтобы обеспечить правильный порядок. Поэтому позже, на стороне потребителя, мы можем реконструировать большое сообщение из сообщений меньшего размера.

Если ни один из вышеперечисленных вариантов не соответствует нашим требованиям, мы можем использовать ранее обсуждавшиеся конфигурации.

8. Заключение

В этой статье мы рассмотрели различные конфигурации Kafka, необходимые для отправки больших сообщений размером более 1 МБ.

Мы рассмотрели потребности в конфигурациях на стороне производителя, темы, брокера и потребителя. Однако некоторые из них являются обязательными конфигурациями, а некоторые — необязательными. Кроме того, потребительские конфигурации являются необязательными, но они полезны, чтобы избежать негативного влияния на производительность.

В конце мы также рассмотрели альтернативные возможные варианты отправки больших сообщений.

Как всегда, пример кода доступен на GitHub .