Перейти к основному содержимому

Руководство по Spring Cloud Stream с Kafka, Apache Avro и Confluent Schema Registry

· 9 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Введение

Apache Kafka — это платформа для обмена сообщениями. С его помощью мы можем обмениваться данными между различными приложениями в любом масштабе.

Spring Cloud Stream — это платформа для создания приложений, управляемых сообщениями. Это может упростить интеграцию Kafka в наши сервисы.

Обычно Kafka используется с форматом сообщений Avro, поддерживаемым реестром схем. В этом руководстве мы будем использовать реестр Confluent Schema. Мы попробуем реализацию интеграции Spring с реестром Confluent Schema Registry, а также собственные библиотеки Confluent.

2. Реестр слитных схем

Kafka представляет все данные в виде байтов, поэтому обычно используется внешняя схема, а также сериализация и десериализация в байты в соответствии с этой схемой. Вместо того, чтобы предоставлять копию этой схемы с каждым сообщением, что было бы дорогостоящим накладным расходом, также принято хранить схему в реестре и предоставлять только идентификатор с каждым сообщением.

Confluent Schema Registry предоставляет простой способ хранения, извлечения и управления схемами. Он предоставляет несколько полезных RESTful API .

Схемы хранятся по субъектам, и по умолчанию реестр выполняет проверку совместимости, прежде чем разрешить загрузку новой схемы для субъекта.

Каждый производитель будет знать схему, которую он создает, и каждый потребитель должен иметь возможность либо потреблять данные в ЛЮБОМ формате, либо должен иметь определенную схему, в которой он предпочитает читать. Производитель консультируется с реестром, чтобы установить правильный идентификатор для использования при отправке. сообщение. Потребитель использует реестр для получения схемы отправителя.

Когда потребитель знает как схему отправителя, так и собственный желаемый формат сообщения, библиотека Avro может преобразовать данные в нужный потребителю формат.

3. Апач Авро

Apache Avro — это система сериализации данных .

Он использует структуру JSON для определения схемы, обеспечивая сериализацию между байтами и структурированными данными.

Одной из сильных сторон Avro является поддержка преобразования сообщений, написанных в одной версии схемы, в формат, определяемый совместимой альтернативной схемой.

Набор инструментов Avro также может генерировать классы для представления структур данных этих схем, упрощая сериализацию в POJO и из них.

4. Настройка проекта

Чтобы использовать реестр схем с Spring Cloud Stream , нам потребуются зависимости Spring Cloud Kafka Binder и реестра схем от Maven:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

Для сериализатора Confluent нам нужно:

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>4.0.0</version>
</dependency>

И сериализатор Confluent находится в их репо:

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

Кроме того, давайте воспользуемся подключаемым модулем Maven для создания классов Avro:

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Для тестирования мы можем использовать существующую настройку Kafka и Schema Registry или использовать докеризированные Confluent и Kafka.

5. Весенний облачный поток

Теперь, когда мы настроили наш проект, давайте напишем производителя, используя Spring Cloud Stream . Он будет публиковать сведения о сотрудниках по теме.

Затем мы создадим потребителя, который будет считывать события из темы и записывать их в оператор журнала.

5.1. Схема

Во-первых, давайте определим схему для сведений о сотрудниках. Мы можем назвать его employee-schema.avsc .

Мы можем сохранить файл схемы в src/main/resources:

{
"type": "record",
"name": "Employee",
"namespace": "com.foreach.schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
}]
}

После создания приведенной выше схемы нам нужно собрать проект. Затем генератор кода Apache Avro создаст POJO с именем Employee в пакете com.foreach.schema .

5.2. Режиссер

Spring Cloud Stream предоставляет интерфейс процессора . Это дает нам выходной и входной канал.

Давайте воспользуемся этим, чтобы создать производителя, который отправляет объекты Employee в топик Kafka с информацией о сотрудниках :

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

// creating employee details
Employee employee = new Employee();
employee.setId(empId);
employee.setFirstName(firstName);
employee.setLastName(lastName);

Message<Employee> message = MessageBuilder.withPayload(employee)
.build();

processor.output()
.send(message);
}

5.2. Потребитель

Теперь давайте напишем нашего потребителя:

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
logger.info("Let's process employee details: {}", employeeDetails);
}

Этот потребитель будет читать события, опубликованные в теме сведений о сотрудниках . Давайте направим его вывод в журнал, чтобы посмотреть, что он делает.

5.3. Привязки Кафки

До сих пор мы работали только с входными и выходными каналами нашего объекта Processor . Эти каналы необходимо настроить с правильными пунктами назначения.

Давайте используем application.yml для предоставления привязок Kafka:

spring:
cloud:
stream:
bindings:
input:
destination: employee-details
content-type: application/*+avro
output:
destination: employee-details
content-type: application/*+avro

Отметим, что в данном случае под пунктом назначения понимается тема Кафки. Может немного сбивать с толку то, что это называется назначением , поскольку в данном случае это источник входных данных, но это согласованный термин для потребителей и производителей.

5.4. Точка входа

Теперь, когда у нас есть производитель и потребитель, давайте представим API для получения входных данных от пользователя и передачи их производителю:

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName,
@PathVariable String lastName) {
avroProducer.produceEmployeeDetails(id, firstName, lastName);
return "Sent employee details to consumer";
}

5.5. Включить реестр Confluent Schema и привязки

Наконец, чтобы наше приложение применяло привязки реестра Kafka и схемы, нам нужно добавить @EnableBinding и @EnableSchemaRegistryClient в один из наших классов конфигурации:

@SpringBootApplication
@EnableBinding(Processor.class)
// The @EnableSchemaRegistryClient annotation needs to be uncommented to use the Spring native method.
// @EnableSchemaRegistryClient
public class AvroKafkaApplication {

public static void main(String[] args) {
SpringApplication.run(AvroKafkaApplication.class, args);
}

}

И мы должны предоставить bean-компонент ConfluentSchemaRegistryClient :

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}

EndPoint — это URL-адрес реестра Confluent Schema.

5.6. Тестирование нашего сервиса

Давайте протестируем сервис с помощью POST-запроса:

curl -X POST localhost:8080/employees/1001/Harry/Potter

Журналы говорят нам, что это сработало:

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.foreach.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. Что произошло во время обработки?

Попробуем понять, что именно произошло с нашим примером приложения:

  1. Производитель создал сообщение Kafka, используя объект Employee .
  2. Производитель зарегистрировал схему сотрудника в реестре схем, чтобы получить идентификатор версии схемы, это либо создает новый идентификатор, либо повторно использует существующий для этой точной схемы.
  3. Avro сериализовал объект Employee , используя схему
  4. Spring Cloud поместил идентификатор схемы в заголовки сообщений.
  5. Сообщение опубликовано в теме
  6. Когда сообщение пришло к потребителю, он прочитал идентификатор схемы из заголовка.
  7. Потребитель использовал schema-id для получения схемы Employee из реестра.
  8. Потребитель нашел локальный класс, который мог бы представлять этот объект, и десериализовал в него сообщение.

6. Сериализация/десериализация с использованием собственных библиотек Kafka

Spring Boot предоставляет несколько готовых конвертеров сообщений. По умолчанию Spring Boot использует заголовок Content-Type для выбора соответствующего преобразователя сообщений.

В нашем примере Content-Type — это application/*+avro, поэтому он использовал AvroSchemaMessageConverter для чтения и записи форматов Avro. Но Confluent рекомендует использовать KafkaAvroSerializer и KafkaAvroDeserializer для преобразования сообщений .

Хотя собственный формат Spring работает хорошо, у него есть некоторые недостатки с точки зрения разбиения на разделы, и он несовместим со стандартами Confluent, что может потребоваться некоторым службам, отличным от Spring, в нашем экземпляре Kafka.

Давайте обновим наш application.yml , чтобы использовать преобразователи Confluent:

spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
input:
destination: employee-details
content-type: application/*+avro
output:
destination: employee-details
content-type: application/*+avro
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true

Мы включили useNativeEncoding . Это заставляет Spring Cloud Stream делегировать сериализацию предоставленным классам.

Мы также должны знать, как мы можем предоставить собственные свойства настроек для Kafka в Spring Cloud, используя kafka.binder.producer-properties и kafka.binder.consumer-properties.

7. Группы потребителей и разделы

Группы потребителей — это набор потребителей, принадлежащих одному и тому же приложению . Потребители из одной и той же группы потребителей имеют одно и то же имя группы.

Давайте обновим application.yml , чтобы добавить имя группы потребителей:

spring:
cloud:
stream:
// ...
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
// ...

Все потребители равномерно распределяют между собой тематические разделы. Сообщения в разных разделах будут обрабатываться параллельно.

В группе потребителей максимальное количество потребителей, одновременно читающих сообщения, равно количеству разделов. Таким образом, мы можем настроить количество разделов и потребителей, чтобы получить желаемый параллелизм. В общем случае у нас должно быть больше разделов, чем общее количество потребителей во всех репликах нашего сервиса.

7.1. Ключ раздела

При обработке наших сообщений порядок их обработки может быть важен. Когда наши сообщения обрабатываются параллельно, последовательность обработки будет трудно контролировать.

Kafka предоставляет правило, согласно которому в данном разделе сообщения всегда обрабатываются в той последовательности, в которой они прибыли . Итак, если важно, чтобы определенные сообщения обрабатывались в правильном порядке, мы гарантируем, что они попадут в один и тот же раздел.

Мы можем предоставить ключ раздела при отправке сообщения в тему. Сообщения с одним и тем же ключом раздела всегда будут отправляться в один и тот же раздел . Если ключ раздела отсутствует, сообщения будут разбиты на разделы в циклическом режиме.

Попробуем понять это на примере. Представьте, что мы получаем несколько сообщений для сотрудника и хотим обработать все сообщения сотрудника в последовательности. Название отдела и идентификатор сотрудника могут однозначно идентифицировать сотрудника.

Итак, давайте определим ключ раздела с идентификатором сотрудника и названием отдела:

{
"type": "record",
"name": "EmployeeKey",
"namespace": "com.foreach.schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "departmentName",
"type": "string"
}]
}

После сборки проекта POJO EmployeeKey будет сгенерирован в пакете com.foreach.schema .

Давайте обновим нашего производителя, чтобы использовать EmployeeKey в качестве ключа раздела:

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

// creating employee details
Employee employee = new Employee();
employee.setId(empId);
// ...

// creating partition key for kafka topic
EmployeeKey employeeKey = new EmployeeKey();
employeeKey.setId(empId);
employeeKey.setDepartmentName("IT");

Message<Employee> message = MessageBuilder.withPayload(employee)
.setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
.build();

processor.output()
.send(message);
}

Здесь мы помещаем ключ раздела в заголовок сообщения.

Теперь тот же раздел будет получать сообщения с тем же идентификатором сотрудника и названием отдела.

7.2. Потребительский параллелизм

Spring Cloud Stream позволяет нам установить параллелизм для потребителя в application.yml :

spring:
cloud:
stream:
// ...
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
concurrency: 3

Теперь наши потребители будут читать три сообщения из топика одновременно. Другими словами, Spring создаст три разных потока для независимого использования.

8. Заключение

В этой статье мы интегрировали производитель и потребитель против Apache Kafka со схемами Avro и Confluent Schema Registry .

Мы сделали это в одном приложении, но производитель и потребитель могли быть развернуты в разных приложениях и могли иметь свои собственные версии схем, которые синхронизировались через реестр.

Мы рассмотрели, как использовать реализацию клиента Avro и Schema Registry в Spring, а затем увидели, как переключиться на стандартную реализацию сериализации и десериализации Confluent для целей взаимодействия.

Наконец, мы рассмотрели, как разделить нашу тему и убедиться, что у нас есть правильные ключи сообщений, чтобы обеспечить безопасную параллельную обработку наших сообщений.

Полный код, использованный в этой статье, можно найти на GitHub .