Перейти к основному содержимому

Отслеживайте отставание потребителей в Apache Kafka

· 7 мин. чтения

1. Обзор

Задержка группы потребителей Kafka — ключевой показатель производительности любой управляемой событиями системы на основе Kafka .

В этом руководстве мы создадим приложение-анализатор для отслеживания потребительского отставания Kafka.

2. Отставание потребителей

Задержка потребителя — это просто разница между последним зафиксированным смещением потребителя и конечным смещением производителя в журнале. Другими словами, потребительская задержка измеряет задержку между созданием и потреблением сообщений в любой системе производитель-потребитель.

В этом разделе давайте разберемся, как мы можем определить значения смещения.

2.1. Kafka AdminClient

Чтобы проверить значения смещения группы потребителей, нам понадобится административный клиент Kafka . Итак, давайте напишем метод в классе LagAnalyzerService для создания экземпляра класса AdminClient :

private AdminClient getAdminClient(String bootstrapServerConfig) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
return AdminClient.create(config);
}

Мы должны отметить использование аннотации @Value для получения списка загрузочных серверов из файла свойств. Точно так же мы будем использовать эту аннотацию для получения других значений, таких как groupId и тематика .

2.2. Смещение группы потребителей

Во- первых, мы можем использовать метод listConsumerGroupOffsets () класса AdminClient для получения информации о смещении определенного идентификатора группы потребителей.

Далее мы сосредоточимся в основном на значениях смещения, поэтому мы можем вызвать метод partitionsToOffsetAndMetadata() , чтобы получить карту значений TopicPartition и OffsetAndMetadata :

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

Map<TopicPartition, Long> groupOffset = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndMetadata metadata = entry.getValue();
groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
}
return groupOffset;
}

Наконец, мы можем заметить итерацию по темеPartitionOffsetAndMetadataMap , чтобы ограничить наши полученные результаты значениями смещения для каждой темы и раздела.

2.3. Смещение производителя

Единственное, что осталось для нахождения отставания группы потребителей, — это способ получения значений конечного смещения. Для этого мы можем использовать метод endOffsets () класса KafkaConsumer .

Начнем с создания экземпляра класса KafkaConsumer в классе LagAnalyzerService :

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}

Затем давайте агрегируем все соответствующие значения TopicPartition из смещений группы потребителей, для которых нам нужно вычислить отставание, чтобы мы предоставили его в качестве аргумента методу endOffsets() :

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
List<TopicPartition> topicPartitions = new LinkedList<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
TopicPartition key = entry.getKey();
topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
}
return kafkaConsumer.endOffsets(topicPartitions);
}

Наконец, давайте напишем метод, который использует смещения потребителя и конечные смещения производителя для создания задержки для каждого TopicPartition : ``

private Map<TopicPartition, Long> computeLags(
Map<TopicPartition, Long> consumerGrpOffsets,
Map<TopicPartition, Long> producerOffsets) {
Map<TopicPartition, Long> lags = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
Long producerOffset = producerOffsets.get(entry.getKey());
Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
long lag = Math.abs(producerOffset - consumerOffset);
lags.putIfAbsent(entry.getKey(), lag);
}
return lags;
}

3. Анализатор запаздывания

Теперь давайте организуем анализ задержки, написав метод analysisLag() в классе LagAnalyzerService :

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
String topic = lagEntry.getKey().topic();
int partition = lagEntry.getKey().partition();
Long lag = lagEntry.getValue();
System.out.printf("Time=%s | Lag for topic = %s, partition = %s is %d\n",
MonitoringUtil.time(),
topic,
partition,
lag);
}
}

Однако, когда дело доходит до мониторинга метрики задержки, нам потребуется значение задержки почти в реальном времени, чтобы мы могли предпринять любые административные действия для восстановления производительности системы .

Одним из простых способов добиться этого является опрос значения задержки через регулярные промежутки времени . Итак, давайте создадим службу LiveLagAnalyzerService , которая будет вызывать метод analysisLag() службы LagAnalyzerService:

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
lagAnalyzerService.analyzeLag(groupId);
}

Для нашей цели мы установили частоту опроса на 5 секунд , используя аннотацию @Scheduled . Однако для мониторинга в реальном времени нам, вероятно, потребуется сделать это доступным через JMX .

4. Моделирование

В этом разделе мы смоделируем производителя и потребителя Kafka для локальной установки Kafka, чтобы мы могли увидеть LagAnalyzer в действии, не завися от внешнего производителя и потребителя Kafka.

4.1. Режим моделирования

Поскольку режим симуляции требуется только для демонстрационных целей , у нас должен быть механизм его отключения, когда мы хотим запустить приложение Lag Analyzer для реального сценария.

Мы можем сохранить это как настраиваемое свойство в файле ресурсов application.properties :

monitor.producer.simulate=true
monitor.consumer.simulate=true

Мы подключим эти свойства к производителю и потребителю Kafka и будем контролировать их поведение.

Кроме того, давайте определим производителя startTime , endTime и вспомогательный метод time() для получения текущего времени во время мониторинга:

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now();
String date = dtf.format(now);
return date;
}

4.2. Конфигурации производитель-потребитель

Нам потребуется определить несколько основных значений конфигурации для создания экземпляров для наших симуляторов потребителей и производителей Kafka.

Во-первых, давайте определим конфигурацию для симулятора потребителя в классе KafkaConsumerConfig :

public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
if (enabled) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
} else {
props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
if (enabled) {
factory.setConsumerFactory(consumerFactory(groupId));
} else {
factory.setConsumerFactory(consumerFactory(simulateGroupId));
}
return factory;
}

Далее мы можем определить конфигурацию симулятора производителя в классе KafkaProducerConfig :

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

Далее, воспользуемся аннотацией @KafkaListener для указания целевого прослушивателя, который, разумеется, включается только тогда, когда для параметра monitor.consumer.simulate установлено значение true :

@KafkaListener(
topics = "${monitor.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
Thread.sleep(10L);
}

Таким образом, мы добавили время ожидания в 10 миллисекунд, чтобы создать искусственную задержку потребителя.

Наконец, давайте напишем метод sendMessage() для имитации производителя :

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
if (enabled) {
if (endTime.after(new Date())) {
String message = "msg-" + time();
SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
}
}
}

Мы можем заметить, что производитель будет генерировать сообщения со скоростью 1 сообщение/мс. Кроме того, он перестанет создавать сообщения после endTime через 30 секунд после startTime симуляции.

4.3. Живой мониторинг

Теперь давайте запустим основной метод в нашем LagAnalyzerApplication:

public static void main(String[] args) {
SpringApplication.run(LagAnalyzerApplication.class, args);
while (true) ;
}

Через каждые 30 секунд мы увидим текущую задержку по каждому разделу темы:

Time=2021/06/06 11:07:24 | Lag for topic = foreachTopic, partition = 0 is 93
Time=2021/06/06 11:07:29 | Lag for topic = foreachTopic, partition = 0 is 290
Time=2021/06/06 11:07:34 | Lag for topic = foreachTopic, partition = 0 is 776
Time=2021/06/06 11:07:39 | Lag for topic = foreachTopic, partition = 0 is 1159
Time=2021/06/06 11:07:44 | Lag for topic = foreachTopic, partition = 0 is 1559
Time=2021/06/06 11:07:49 | Lag for topic = foreachTopic, partition = 0 is 2015
Time=2021/06/06 11:07:54 | Lag for topic = foreachTopic, partition = 0 is 1231
Time=2021/06/06 11:07:59 | Lag for topic = foreachTopic, partition = 0 is 731
Time=2021/06/06 11:08:04 | Lag for topic = foreachTopic, partition = 0 is 231
Time=2021/06/06 11:08:09 | Lag for topic = foreachTopic, partition = 0 is 0

Таким образом, скорость, с которой производитель создает сообщения, составляет 1 сообщение/мс, что выше, чем скорость, с которой потребитель потребляет сообщение. Итак, первые 30 секунд лаг будет нарастать, после чего производитель перестанет производить, так что лаг постепенно снизится до 0 .

5. Вывод

В этом уроке мы развили понимание того, как найти отставание потребителя по теме Kafka. Кроме того, мы использовали эти знания для создания приложения LagAnalyzer весной, которое показывает отставание практически в реальном времени .

Как всегда, полный исходный код руководства доступен на GitHub .