Перейти к основному содержимому

Использование Kafka MockProducer

· 5 мин. чтения

1. Обзор

Kafka — это система обработки сообщений, построенная на основе распределенной очереди сообщений. Он предоставляет библиотеку Java, чтобы приложения могли записывать данные или считывать данные из темы Kafka.

Теперь, поскольку большая часть логики бизнес-домена проверяется с помощью модульных тестов, приложения обычно имитируют все операции ввода-вывода в JUnit. Kafka также предоставляет MockProducer для имитации приложения производителя.

В этом руководстве мы сначала реализуем приложение производителя Kafka. Позже мы реализуем модульный тест для проверки общих операций производителя с помощью MockProducer .

2. Зависимости Maven

Прежде чем мы реализуем приложение-производитель, мы добавим зависимость Maven для kafka-clients :

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>

3. МокПродюсер

Библиотека kafka-clients содержит библиотеку Java для публикации и использования сообщений в Kafka. Приложения-производители могут использовать эти API для отправки записей "ключ-значение" в тему Kafka:

public class KafkaProducer {

private final Producer<String, String> producer;

public KafkaProducer(Producer<String, String> producer) {
this.producer = producer;
}

public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
return producer.send(record);
}
}

Любой производитель Kafka должен реализовать интерфейс Producer в клиентской библиотеке. Kafka также предоставляет класс KafkaProducer , который представляет собой конкретную реализацию, выполняющую операции ввода-вывода по отношению к брокеру Kafka.

Кроме того, Kafka предоставляет MockProducer , который реализует тот же интерфейс Producer и имитирует все операции ввода-вывода, реализованные в KafkaProducer :

@Test
void givenKeyValue_whenSend_thenVerifyHistory() {

MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());

kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer",
"{\"site\" : \"foreach\"}");

assertTrue(mockProducer.history().size() == 1);
}

Хотя такие операции ввода-вывода также можно смоделировать с помощью Mockito , MockProducer предоставляет нам доступ ко многим функциям, которые нам нужно реализовать поверх нашего макета. Одной из таких функций является метод history() . MockProducer кэширует записи, для которых вызывается send() , тем самым позволяя нам проверять поведение производителя при публикации.

Кроме того, мы также можем проверить метаданные, такие как имя темы, раздел, ключ записи или значение:

assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);

4. Насмешка над кластером Kafka

До сих пор в наших фиктивных тестах мы предполагали тему только с одним разделом. Однако для достижения максимального параллелизма между потоками производителя и потребителя темы Kafka обычно разбиваются на несколько разделов.

Это позволяет производителям записывать данные в несколько разделов. Обычно это достигается путем разделения записей на основе ключа и сопоставления определенных ключей с определенным разделом:

public class EvenOddPartitioner extends DefaultPartitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
if (((String)key).length() % 2 == 0) {
return 0;
}
return 1;
}
}

Из-за этого все ключи четной длины будут опубликованы в разделе «0», а ключи нечетной длины — в разделе «1».

MockProducer позволяет нам проверять такие алгоритмы назначения разделов, имитируя кластер Kafka с несколькими разделами:

@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber()
throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);

Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(),
new StringSerializer(), new StringSerializer());

kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition",
"{\"site\" : \"foreach\"}");

assertTrue(recordMetadataFuture.get().partition() == 1);
}

Мы смоделировали кластер с двумя разделами, 0 и 1. Затем мы можем убедиться, что EvenOddPartitioner публикует запись в разделе 1.

5. Имитация ошибок с помощью MockProducer

До сих пор мы только издевались над продюсером, чтобы он успешно отправил запись в тему Кафки. Но что произойдет, если при записи записи возникнет исключение?

Приложения обычно обрабатывают такие исключения, повторяя попытку или выдавая исключение клиенту.

MockProducer позволяет нам имитировать исключения во время send() , чтобы мы могли проверить код обработки исключений:

@Test
void givenKeyValue_whenSend_thenReturnException() {
MockProducer<String, String> mockProducer = new MockProducer<>(false,
new StringSerializer(), new StringSerializer())

kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"foreach\"}");
RuntimeException e = new RuntimeException();
mockProducer.errorNext(e);

try {
record.get();
} catch (ExecutionException | InterruptedException ex) {
assertEquals(e, ex.getCause());
}
assertTrue(record.isDone());
}

В этом коде есть две примечательные вещи.

Во-первых, мы вызвали конструктор MockProducer с autoComplete как false. Это говорит MockProducer ждать ввода перед завершением метода send() .

Во- вторых, мы вызовем mockProducer.errorNext(e), чтобы MockProducer вернул исключение для последнего вызова send() .

6. Моделирование транзакционных записей с помощью MockProducer

Kafka 0.11 представила транзакции между брокерами Kafka, производителями и потребителями. Это позволило семантику сквозной доставки сообщений Exactly-Once в Kafka. Короче говоря, это означает, что производители транзакций могут публиковать записи только для брокера с двухфазным протоколом фиксации.

MockProducer также поддерживает транзакционную запись и позволяет нам проверить это поведение:

@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
MockProducer<String, String> mockProducer = new MockProducer<>(true,
new StringSerializer(), new StringSerializer())

kafkaProducer = new KafkaProducer(mockProducer);
kafkaProducer.initTransaction();
kafkaProducer.beginTransaction();
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"foreach\"}");

assertTrue(mockProducer.history().isEmpty());
kafkaProducer.commitTransaction();
assertTrue(mockProducer.history().size() == 1);
}

Поскольку MockProducer также поддерживает те же API, что и конкретный KafkaProducer, он обновляет историю только после фиксации транзакции. Такое фиктивное поведение может помочь приложениям проверить, что commitTransaction() вызывается для каждой транзакции.

7. Заключение

В этой статье мы рассмотрели класс MockProducer библиотеки kafka -client . Мы обсуждали, что MockProducer реализует ту же иерархию, что и конкретный KafkaProducer , и поэтому мы можем имитировать все операции ввода-вывода с помощью брокера Kafka.

Мы также обсудили некоторые сложные фиктивные сценарии и смогли протестировать исключения, секционирование и транзакции с помощью MockProducer.

Как всегда, все примеры кода доступны на GitHub .