1. Обзор
Apache Kafka — мощная, распределенная, отказоустойчивая система обработки потоков. В предыдущем уроке мы узнали, как работать со Spring и Kafka .
В этом руководстве мы будем основываться на предыдущем и узнаем, как писать надежные автономные интеграционные тесты, которые не зависят от работающего внешнего сервера Kafka.
Во-первых, мы начнем с рассмотрения того, как использовать и настраивать встроенный экземпляр Kafka.
Затем мы увидим, как мы можем использовать популярный фреймворк Testcontainers из наших тестов.
2. Зависимости
Конечно, нам нужно добавить стандартную зависимость spring-
kafka в наш pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
Затем нам понадобятся еще две зависимости специально для наших тестов.
Во- первых, мы добавим артефакт spring-kafka-test
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
И, наконец, мы добавим зависимость Testcontainers Kafka, которая также доступна на Maven Central :
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
Теперь, когда у нас настроены все необходимые зависимости, мы можем написать простое приложение Spring Boot, используя Kafka.
3. Простое производственно-потребительское приложение Kafka
На протяжении всего этого руководства в центре внимания наших тестов будет простое приложение Spring Boot Kafka производителя-потребителя.
Давайте начнем с определения точки входа нашего приложения:
@SpringBootApplication
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
Как мы видим, это стандартное приложение Spring Boot.
3.1. Настройка продюсера
Далее давайте рассмотрим bean-компонент производителя, который мы будем использовать для отправки сообщений в заданную тему Kafka:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
Наш bean-компонент KafkaProducer,
определенный выше, является просто оболочкой класса KafkaTemplate
. Этот класс обеспечивает высокоуровневые потокобезопасные операции, такие как отправка данных в указанную тему, что мы и делаем в нашем методе send .
3.2. Настройка потребителя
Точно так же мы теперь определим простой потребительский компонент, который будет прослушивать тему Kafka и получать сообщения:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
}
Наш простой потребитель использует аннотацию @KafkaListener в методе
получения
для прослушивания сообщений по заданной теме. Позже мы увидим, как мы настроим test.topic
из наших тестов.
Кроме того, метод получения сохраняет содержимое сообщения в нашем bean-компоненте и уменьшает значение счетчика переменной - защелки .
Эта переменная представляет собой простое потокобезопасное поле счетчика, которое мы будем использовать позже в наших тестах, чтобы убедиться, что мы успешно получили сообщение.
Теперь, когда у нас реализовано наше простое приложение Kafka с использованием Spring Boot, давайте посмотрим, как мы можем писать интеграционные тесты.
4. Несколько слов о тестировании
В общем, при написании чистых интеграционных тестов мы не должны зависеть от внешних сервисов, которые мы не можем контролировать или которые могут внезапно перестать работать. Это может отрицательно сказаться на результатах наших тестов.
Точно так же, если мы зависим от внешней службы, в данном случае от работающего брокера Kafka, мы, скорее всего, не сможем настроить его, контролировать и отключить так, как мы хотим в наших тестах.
4.1. Свойства приложения
Мы собираемся использовать очень легкий набор свойств конфигурации приложения из наших тестов.
Мы определим эти свойства в нашем файле src/test/resources/application.yml
:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: foreach
test:
topic: embedded-test-topic
Это минимальный набор свойств, который нам нужен при работе со встроенным экземпляром Kafka или локальным брокером.
Большинство из них говорят сами за себя, но мы должны выделить свойство auto-offset-reset: Early
. Это свойство гарантирует, что наша группа потребителей получит сообщения, которые мы отправляем, потому что контейнер может запуститься после завершения отправки.
Кроме того, мы настраиваем свойство темы со значением embedded-test-topic
, и это тема, которую мы будем использовать в наших тестах.
5. Тестирование с использованием встроенной Kafka
В этом разделе мы рассмотрим, как использовать экземпляр Kafka в памяти для запуска наших тестов. Это также известно как Embedded Kafka.
Зависимость spring-kafka-test
, которую мы добавили ранее, содержит несколько полезных утилит, помогающих тестировать наше приложение. В частности, он содержит класс EmbeddedKafkaBroker .
Имея это в виду, давайте напишем наш первый интеграционный тест:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
Давайте пройдемся по ключевым частям нашего теста.
Во-первых, мы начнем с украшения нашего тестового класса двумя довольно стандартными аннотациями Spring:
Аннотация @SpringBootTest
гарантирует , что наш тест загружает контекст приложения Spring.- Мы также используем аннотацию
@DirtiesContext
, которая гарантирует очистку и сброс этого контекста между различными тестами.
Наступает важная часть — мы используем аннотацию @EmbeddedKafka
для внедрения экземпляра EmbeddedKafkaBroker
в наши тесты.
Кроме того, есть несколько доступных свойств, которые мы можем использовать для настройки встроенного узла Kafka:
разделы
— это количество разделов, используемых для каждой темы. Чтобы все было красиво и просто, мы хотим использовать только один из наших тестов.BrokerProperties
— дополнительные свойства для брокера Kafka. Опять же, мы делаем все просто и указываем прослушиватель простого текста и номер порта.
Затем мы автоматически связываем наши классы потребителей
и производителей
и настраиваем тему для использования значения из нашего application.properties
.
В качестве последней части головоломки мы просто отправляем сообщение в нашу тестовую тему и проверяем, что сообщение было получено и содержит название нашей тестовой темы.
Когда мы запустим наш тест, вот что мы увидим среди подробного вывода Spring:
...
12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO c.b.kafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
key = null, value = Отправка с помощью нашего собственного простого KafkaProducer)'
Это подтверждает, что наш тест работает правильно. Потрясающий! Теперь у нас есть способ написать автономные, независимые интеграционные тесты, используя брокера Kafka в памяти.
6. Тестирование Kafka с помощью TestContainers
Иногда мы можем увидеть небольшие различия между реальной внешней службой и встроенным в память экземпляром службы, который был специально предоставлен для целей тестирования. Хотя маловероятно, также может быть, что порт, использованный в нашем тесте, может быть занят, что приведет к сбою.
Имея это в виду, в этом разделе мы увидим вариант нашего предыдущего подхода к тестированию с использованием среды Testcontainers . Мы увидим, как создать экземпляр внешнего брокера Apache Kafka, размещенного внутри контейнера Docker , и управлять им из нашего интеграционного теста.
Давайте определим еще один интеграционный тест, который будет очень похож на тот, который мы видели в предыдущем разделе:
@RunWith(SpringRunner.class)
@Import(com.foreach.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
Давайте посмотрим на различия. Мы объявляем поле kafka
, которое является стандартным JUnit @ClassRule
. Это поле является экземпляром класса KafkaContainer
, который будет подготавливать и управлять жизненным циклом нашего контейнера с Kafka.
Чтобы избежать конфликтов портов, Testcontainers динамически выделяет номер порта при запуске нашего контейнера Docker.
По этой причине мы предоставляем пользовательскую конфигурацию фабрики-потребителя и производителя, используя класс KafkaTestContainersConfiguration
:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foreach");
// more standard configuration
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// more standard configuration
return new DefaultKafkaProducerFactory<>(configProps);
}
Затем мы ссылаемся на эту конфигурацию через аннотацию @Import
в начале нашего теста.
Причина этого в том, что нам нужен способ внедрить адрес сервера в наше приложение, которое, как упоминалось ранее, генерируется динамически.
Мы достигаем этого, вызывая метод getBootstrapServers()
, который возвращает местоположение сервера начальной загрузки :
bootstrap.servers = [PLAINTEXT://localhost:32789]
Теперь, когда мы запустим наш тест, мы должны увидеть, что Testcontainers делает несколько вещей:
- Проверяет нашу локальную настройку Docker
- При необходимости извлекает образ докера
confluentinc/cp-kafka:5.4.3
. - Запускает новый контейнер и ждет, пока он будет готов
- Наконец, выключается и удаляет контейнер после завершения нашего теста.
Опять же, это подтверждается проверкой вывода теста:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
Престо! Рабочий интеграционный тест с использованием док-контейнера Kafka.
7. Заключение
В этой статье мы узнали о нескольких подходах к тестированию приложений Kafka с помощью Spring Boot.
В первом подходе мы увидели, как настроить и использовать локального брокера Kafka в памяти.
Затем мы увидели, как использовать Testcontainers для настройки внешнего брокера Kafka, работающего внутри док-контейнера из наших тестов.
Как всегда, полный исходный код статьи доступен на GitHub .