Перейти к основному содержимому

Пользовательские сериализаторы в Apache Kafka

· 5 мин. чтения

1. Введение

Во время передачи сообщений в Apache Kafka клиент и сервер договариваются об использовании общего синтаксического формата. Apache Kafka предоставляет конвертеры по умолчанию (такие как String и Long ), но также поддерживает пользовательские сериализаторы для конкретных случаев использования. В этом уроке мы увидим, как их реализовать.

2. Сериализаторы в Apache Kafka

Сериализация — это процесс преобразования объектов в байты . Десериализация — это обратный процесс — преобразование потока байтов в объект. Короче говоря, он преобразует контент в удобочитаемую и интерпретируемую информацию .

Как мы уже упоминали, Apache Kafka предоставляет сериализаторы по умолчанию для нескольких основных типов и позволяет нам реализовывать собственные сериализаторы:

./19553d4d6f62d0f8bb82573abd5996e5.png

На рисунке выше показан процесс отправки сообщений в топик Kafka по сети. В этом процессе пользовательский сериализатор преобразует объект в байты, прежде чем производитель отправит сообщение в тему. Точно так же он также показывает, как десериализатор преобразует байты обратно в объект, чтобы потребитель мог его правильно обработать.

2.1. Пользовательские сериализаторы

Apache Kafka предоставляет встроенный сериализатор и десериализатор для нескольких основных типов:

Но он также предлагает возможность реализации пользовательских (де)сериализаторов. Чтобы сериализовать наши собственные объекты, мы реализуем интерфейс Serializer . Точно так же, чтобы создать собственный десериализатор, мы реализуем интерфейс [`Deserializer`](https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html) .

Существуют методы, доступные для переопределения для обоих интерфейсов:

  • configure : используется для реализации деталей конфигурации
  • serialize/deserialize : эти методы включают фактическую реализацию нашей пользовательской сериализации и десериализации .
  • close : используйте этот метод, чтобы закрыть сеанс Kafka

3. Реализация пользовательских сериализаторов в Apache Kafka

Apache Kafka предоставляет возможность настройки сериализаторов . Можно реализовать специальные преобразователи не только для значения сообщения, но и для ключа.

3.1. Зависимости

Чтобы реализовать примеры, мы просто добавим зависимость Kafka Consumer API в наш pom.xml :

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

3.2. Пользовательский сериализатор

Во- первых, мы будем использовать Lombok , чтобы указать пользовательский объект для отправки через Kafka:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}

Далее мы реализуем интерфейс Serializer , предоставленный Kafka для отправки сообщений производителем:

@Slf4j
public class CustomSerializer implements Serializer {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}

@Override
public void close() {
}
}

Мы переопределим метод сериализации интерфейса . Поэтому в нашей реализации мы преобразуем пользовательский объект с помощью Jackson ObjectMapper . Затем мы вернем поток байтов, чтобы правильно отправить сообщение в сеть.

3.3. Пользовательский десериализатор

Точно так же мы реализуем интерфейс Deserializer для потребителя:

@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
System.out.println("Deserializing...");
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}

@Override
public void close() {
}
}

Как и в предыдущем разделе, мы переопределим метод десериализации интерфейса . Следовательно, мы будем преобразовывать поток байтов в пользовательский объект с помощью того же Jackson ObjectMapper .

3.4. Использование примера сообщения

Давайте посмотрим на рабочий пример отправки и получения примера сообщения с помощью настраиваемого сериализатора и десериализатора.

Во-первых, мы создадим и настроим Kafka Producer:

private static KafkaProducer<String, MessageDto> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.foreach.kafka.serdes.CustomSerializer");

return new KafkaProducer(props);
}

Мы настроим свойство сериализатора значений с нашим пользовательским классом , а сериализатор ключей — с StringSerializer по умолчанию .

Во-вторых, мы создадим Kafka Consumer:

private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.foreach.kafka.serdes.CustomDeserializer");

return new KafkaConsumer<>(props);
}

Помимо десериализаторов ключа и значения с нашим пользовательским классом , обязательно включать идентификатор группы. Кроме того, мы установили конфигурацию автоматического сброса смещения на самую раннюю , чтобы убедиться, что производитель отправил все сообщения до того, как потребитель запустится.

После того, как мы создали клиентов производителя и потребителя, пришло время отправить примерное сообщение:

MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();

KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();

И мы можем получить сообщение с потребителем, подписавшись на тему:

AtomicReference<MessageDto> msgCons = new AtomicReference<>();

KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));

ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
msgCons.set(record.value());
System.out.println("Message received " + record.value());
});

consumer.close();

Результат в консоли:

Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)

4. Вывод

В этом руководстве мы показали, как производители используют сериализаторы в Apache Kafka для отправки сообщений по сети. Точно так же мы также показали, как потребители используют десериализаторы для интерпретации полученного сообщения.

Кроме того, мы узнали о доступных сериализаторах по умолчанию и, что наиболее важно, о возможности реализации пользовательских сериализаторов и десериализаторов.

Как всегда, код доступен на GitHub .