1. Обзор
Задержка группы потребителей Kafka — ключевой показатель производительности любой управляемой событиями системы на основе Kafka .
В этом руководстве мы создадим приложение-анализатор для отслеживания потребительского отставания Kafka.
2. Отставание потребителей
Задержка потребителя — это просто разница между последним зафиксированным смещением потребителя и конечным смещением производителя в журнале. Другими словами, потребительская задержка измеряет задержку между созданием и потреблением сообщений в любой системе производитель-потребитель.
В этом разделе давайте разберемся, как мы можем определить значения смещения.
2.1. Kafka AdminClient
Чтобы проверить значения смещения группы потребителей, нам понадобится административный клиент Kafka . Итак, давайте напишем метод в классе LagAnalyzerService
для создания экземпляра класса AdminClient
:
private AdminClient getAdminClient(String bootstrapServerConfig) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
return AdminClient.create(config);
}
Мы должны отметить использование аннотации @Value
для получения списка загрузочных серверов из файла свойств. Точно так же мы будем использовать эту аннотацию для получения других значений, таких как groupId и тематика
.
2.2. Смещение группы потребителей
Во- первых, мы можем использовать метод listConsumerGroupOffsets () класса
AdminClient
для получения информации о смещении определенного идентификатора группы потребителей.
Далее мы сосредоточимся в основном на значениях смещения, поэтому мы можем вызвать метод partitionsToOffsetAndMetadata()
, чтобы получить карту значений TopicPartition и OffsetAndMetadata
:
private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId)
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, Long> groupOffset = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndMetadata metadata = entry.getValue();
groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
}
return groupOffset;
}
Наконец, мы можем заметить итерацию по темеPartitionOffsetAndMetadataMap
, чтобы ограничить наши полученные результаты значениями смещения для каждой темы и раздела.
2.3. Смещение производителя
Единственное, что осталось для нахождения отставания группы потребителей, — это способ получения значений конечного смещения. Для этого мы можем использовать метод endOffsets () класса
KafkaConsumer
.
Начнем с создания экземпляра класса KafkaConsumer
в классе LagAnalyzerService
:
private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
Затем давайте агрегируем все соответствующие значения TopicPartition из смещений группы потребителей, для которых нам нужно вычислить отставание, чтобы мы предоставили его в качестве аргумента методу endOffsets()
:
private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
List<TopicPartition> topicPartitions = new LinkedList<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
TopicPartition key = entry.getKey();
topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
}
return kafkaConsumer.endOffsets(topicPartitions);
}
Наконец, давайте напишем метод, который использует смещения потребителя и конечные смещения производителя для
создания задержки для каждого TopicPartition
: ``
private Map<TopicPartition, Long> computeLags(
Map<TopicPartition, Long> consumerGrpOffsets,
Map<TopicPartition, Long> producerOffsets) {
Map<TopicPartition, Long> lags = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
Long producerOffset = producerOffsets.get(entry.getKey());
Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
long lag = Math.abs(producerOffset - consumerOffset);
lags.putIfAbsent(entry.getKey(), lag);
}
return lags;
}
3. Анализатор запаздывания
Теперь давайте организуем анализ задержки, написав метод analysisLag()
в классе LagAnalyzerService
:
public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
String topic = lagEntry.getKey().topic();
int partition = lagEntry.getKey().partition();
Long lag = lagEntry.getValue();
System.out.printf("Time=%s | Lag for topic = %s, partition = %s is %d\n",
MonitoringUtil.time(),
topic,
partition,
lag);
}
}
Однако, когда дело доходит до мониторинга метрики задержки, нам потребуется значение задержки почти в реальном времени, чтобы мы могли предпринять любые административные действия для восстановления производительности системы .
Одним из простых способов добиться этого является опрос значения задержки через регулярные промежутки времени . Итак, давайте создадим службу LiveLagAnalyzerService
, которая будет вызывать метод analysisLag()
службы LagAnalyzerService:
@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
lagAnalyzerService.analyzeLag(groupId);
}
Для нашей цели мы установили частоту опроса на 5 секунд
, используя аннотацию @Scheduled
. Однако для мониторинга в реальном времени нам, вероятно, потребуется сделать это доступным через JMX .
4. Моделирование
В этом разделе мы смоделируем производителя и потребителя Kafka для локальной установки Kafka, чтобы мы могли увидеть LagAnalyzer
в действии, не завися от внешнего производителя и потребителя Kafka.
4.1. Режим моделирования
Поскольку режим симуляции требуется только для демонстрационных целей , у нас должен быть механизм его отключения, когда мы хотим запустить приложение Lag Analyzer для реального сценария.
Мы можем сохранить это как настраиваемое свойство в файле ресурсов application.properties :
monitor.producer.simulate=true
monitor.consumer.simulate=true
Мы подключим эти свойства к производителю и потребителю Kafka и будем контролировать их поведение.
Кроме того, давайте определим производителя startTime
, endTime
и вспомогательный метод time()
для получения текущего времени во время мониторинга:
public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);
public static String time() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now();
String date = dtf.format(now);
return date;
}
4.2. Конфигурации производитель-потребитель
Нам потребуется определить несколько основных значений конфигурации для создания экземпляров для наших симуляторов потребителей и производителей Kafka.
Во-первых, давайте определим конфигурацию для симулятора потребителя в классе KafkaConsumerConfig
:
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
if (enabled) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
} else {
props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
if (enabled) {
factory.setConsumerFactory(consumerFactory(groupId));
} else {
factory.setConsumerFactory(consumerFactory(simulateGroupId));
}
return factory;
}
Далее мы можем определить конфигурацию симулятора производителя в классе KafkaProducerConfig
:
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Далее, воспользуемся аннотацией @KafkaListener
для указания целевого прослушивателя, который, разумеется, включается только тогда, когда для параметра monitor.consumer.simulate
установлено значение true
:
@KafkaListener(
topics = "${monitor.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
Thread.sleep(10L);
}
Таким образом, мы добавили время ожидания в 10 миллисекунд, чтобы создать искусственную задержку потребителя.
Наконец, давайте напишем метод sendMessage()
для имитации производителя :
@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
if (enabled) {
if (endTime.after(new Date())) {
String message = "msg-" + time();
SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
}
}
}
Мы можем заметить, что производитель будет генерировать сообщения со скоростью 1 сообщение/мс. Кроме того, он перестанет создавать сообщения после endTime через
30 секунд
после startTime
симуляции.
4.3. Живой мониторинг
Теперь давайте запустим основной метод в нашем LagAnalyzerApplication:
public static void main(String[] args) {
SpringApplication.run(LagAnalyzerApplication.class, args);
while (true) ;
}
Через каждые 30 секунд мы увидим текущую задержку по каждому разделу темы:
Time=2021/06/06 11:07:24 | Lag for topic = foreachTopic, partition = 0 is 93
Time=2021/06/06 11:07:29 | Lag for topic = foreachTopic, partition = 0 is 290
Time=2021/06/06 11:07:34 | Lag for topic = foreachTopic, partition = 0 is 776
Time=2021/06/06 11:07:39 | Lag for topic = foreachTopic, partition = 0 is 1159
Time=2021/06/06 11:07:44 | Lag for topic = foreachTopic, partition = 0 is 1559
Time=2021/06/06 11:07:49 | Lag for topic = foreachTopic, partition = 0 is 2015
Time=2021/06/06 11:07:54 | Lag for topic = foreachTopic, partition = 0 is 1231
Time=2021/06/06 11:07:59 | Lag for topic = foreachTopic, partition = 0 is 731
Time=2021/06/06 11:08:04 | Lag for topic = foreachTopic, partition = 0 is 231
Time=2021/06/06 11:08:09 | Lag for topic = foreachTopic, partition = 0 is 0
Таким образом, скорость, с которой производитель создает сообщения, составляет 1 сообщение/мс, что выше, чем скорость, с которой потребитель потребляет сообщение. Итак, первые 30 секунд лаг будет нарастать, после чего производитель перестанет производить, так что лаг постепенно снизится до 0 .
5. Вывод
В этом уроке мы развили понимание того, как найти отставание потребителя по теме Kafka. Кроме того, мы использовали эти знания для создания приложения LagAnalyzer
весной, которое показывает отставание практически в реальном времени .
Как всегда, полный исходный код руководства доступен на GitHub .