Перейти к основному содержимому

Использование Kafka MockConsumer

· 6 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим MockConsumer , одну из реализаций Kafka Consumer .

Во-первых, мы обсудим, что необходимо учитывать при тестировании Kafka Consumer . Затем мы увидим, как мы можем использовать MockConsumer для реализации тестов.

2. Тестирование потребителя Kafka

Использование данных из Kafka состоит из двух основных этапов. Во-первых, мы должны подписываться на темы или назначать разделы тем вручную. Во-вторых, мы опрашиваем пакеты записей методом опроса .

Опрос обычно выполняется в бесконечном цикле. Это потому, что мы обычно хотим потреблять данные непрерывно.

Например, давайте рассмотрим простую логику потребления, состоящую только из подписки и цикла опроса:

void consume() {
try {
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> processRecord(record));
}
} catch (WakeupException ex) {
// ignore for shutdown
} catch (RuntimeException ex) {
// exception handling
} finally {
consumer.close();
}
}

Глядя на приведенный выше код, мы видим, что есть несколько вещей, которые мы можем протестировать:

  • подписка
  • цикл опроса
  • обработка исключений
  • если Consumer был закрыт правильно

У нас есть несколько вариантов проверки логики потребления.

Мы можем использовать экземпляр Kafka в памяти. Но этот подход имеет некоторые недостатки. В общем, экземпляр Kafka в памяти делает тесты очень тяжелыми и медленными. Более того, его настройка — непростая задача и может привести к нестабильным тестам.

В качестве альтернативы, мы можем использовать mocking framework, чтобы издеваться над Consumer. Хотя использование этого подхода делает тесты легковесными, его настройка может быть несколько сложной.

Последний и, возможно, лучший вариант — использовать MockConsumer , который является реализацией Consumer , предназначенной для тестирования. Это не только помогает нам создавать легкие тесты, но и легко настраивается .

Давайте посмотрим на функции, которые он предоставляет.

3. Использование MockConsumer

MockConsumer реализует интерфейс Consumer , который предоставляет библиотека kafka-clients . Таким образом, он полностью имитирует поведение реального Потребителя , и нам не нужно писать много кода . ** `` `` **

Давайте рассмотрим несколько примеров использования MockConsumer . В частности, мы возьмем несколько распространенных сценариев, с которыми мы можем столкнуться при тестировании пользовательского приложения, и реализуем их с помощью MockConsumer .

В нашем примере давайте рассмотрим приложение, которое использует обновления населения страны из темы Kafka. Обновления содержат только название страны и ее текущее население:

class CountryPopulation {

private String country;
private Integer population;

// standard constructor, getters and setters
}

Наш потребитель просто опрашивает обновления с помощью экземпляра Kafka Consumer , обрабатывает их и в конце фиксирует смещение с помощью метода commitSync :

public class CountryPopulationConsumer {

private Consumer<String, Integer> consumer;
private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;

// standard constructor

void startBySubscribing(String topic) {
consume(() -> consumer.subscribe(Collections.singleton(topic)));
}

void startByAssigning(String topic, int partition) {
consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
}

private void consume(Runnable beforePollingTask) {
try {
beforePollingTask.run();
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
StreamSupport.stream(records.spliterator(), false)
.map(record -> new CountryPopulation(record.key(), record.value()))
.forEach(countryPopulationConsumer);
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println("Shutting down...");
} catch (RuntimeException ex) {
exceptionConsumer.accept(ex);
} finally {
consumer.close();
}
}

public void stop() {
consumer.wakeup();
}
}

3.1. Создание экземпляра MockConsumer

Далее давайте посмотрим, как мы можем создать экземпляр MockConsumer :

@BeforeEach
void setUp() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
updates = new ArrayList<>();
countryPopulationConsumer = new CountryPopulationConsumer(consumer,
ex -> this.pollException = ex, updates::add);
}

По сути, все, что нам нужно предоставить, — это стратегию сброса смещения.

Обратите внимание, что мы используем обновления для сбора записей , которые получит countryPopulationConsumer . Это поможет нам утвердить ожидаемые результаты.

Точно так же мы используем pollException для сбора и утверждения исключений.

Для всех тестов мы будем использовать описанный выше метод настройки. Теперь давайте рассмотрим несколько тестов для потребительского приложения.

3.2. Назначение тематических разделов

Для начала создадим тест для метода startByAssigning :

@Test
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);

// WHEN
countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);

// THEN
assertThat(updates).hasSize(1);
assertThat(consumer.closed()).isTrue();
}

Сначала мы настроили MockConsumer. Начнем с добавления записи потребителю с помощью метода addRecord .

Первое, что нужно помнить, это то, что мы не можем добавлять записи до назначения темы или подписки на нее . Именно поэтому мы планируем задачу опроса с помощью метода schedulePollTask . Запланированная нами задача будет выполняться при первом опросе до получения записей. Таким образом, добавление записи произойдет после того, как произойдет присвоение.

Не менее важно и то, что мы не можем добавлять в MockConsumer записи, не принадлежащие теме и назначенному ей разделу .

Затем, чтобы убедиться, что потребитель не работает бесконечно, мы настраиваем его на отключение при втором опросе.

Кроме того, мы должны установить начальные смещения. Мы делаем это с помощью метода updateBeginningOffsets .

В конце мы проверяем, правильно ли мы использовали обновление, и потребитель закрывается.

3.3. Подписка на темы

Теперь давайте создадим тест для нашего метода startBySubscribe :

@Test
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> {
consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
consumer.addRecord(record("Romania", 1000, TOPIC, 0));
});
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, 0);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);

// WHEN
countryPopulationConsumer.startBySubscribing(TOPIC);

// THEN
assertThat(updates).hasSize(1);
assertThat(consumer.closed()).isTrue();
}

В этом случае первое, что нужно сделать перед добавлением записи, — это перебалансировать . Мы делаем это, вызывая метод перебалансировки , который имитирует перебалансировку.

Остальное такое же, как и в тестовом примере startByAssigning .

3.4. Управление циклом опроса

Мы можем управлять циклом опроса несколькими способами.

Первый вариант — запланировать задачу опроса, как мы это делали в тестах выше. Мы делаем это через schedulePollTask, который принимает Runnable в качестве параметра. Каждая запланированная нами задача будет выполняться при вызове метода опроса .

Второй вариант, который у нас есть, — вызвать метод пробуждения . Обычно именно так мы прерываем длинный опрос. Собственно, так мы реализовали метод остановки в CountryPopulationConsumer.

Наконец, мы можем установить исключение, которое будет выброшено, используя метод setPollException :

@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, 0);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);

// WHEN
countryPopulationConsumer.startBySubscribing(TOPIC);

// THEN
assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
assertThat(consumer.closed()).isTrue();
}

3.5. Имитация конечных смещений и информации о разделах

Если наша логика потребления основана на конечных смещениях или информации о разделах, мы также можем имитировать их с помощью MockConsumer .

Когда мы хотим имитировать конечное смещение, мы можем использовать методы addEndOffsets и updateEndOffsets .

И, если мы хотим имитировать информацию о разделе, мы можем использовать метод updatePartitions .

4. Вывод

В этой статье мы рассмотрели, как использовать MockConsumer для тестирования потребительского приложения Kafka.

Во-первых, мы рассмотрели пример потребительской логики и основные части для тестирования. Затем мы протестировали простое потребительское приложение Kafka, используя MockConsumer .

Попутно мы рассмотрели возможности MockConsumer и способы его использования.

Как всегда, все эти примеры кода доступны на GitHub .