Перейти к основному содержимому

Exactly Once Обработка в Kafka с Java

· 6 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим, как Kafka обеспечивает однократную доставку между приложениями-производителями и приложениями-потребителями с помощью недавно представленного API транзакций.

Кроме того, мы будем использовать этот API для реализации транзакционных производителей и потребителей для достижения сквозной однократной доставки в примере WordCount.

2. Доставка сообщений в Kafka

Из-за различных сбоев системы обмена сообщениями не могут гарантировать доставку сообщений между приложениями производителя и потребителя. В зависимости от того, как клиентские приложения взаимодействуют с такими системами, возможна следующая семантика сообщений:

  • Если система обмена сообщениями никогда не будет дублировать сообщение, но может пропустить случайное сообщение, мы называем это не более одного раза .
  • Или, если он никогда не пропустит сообщение, но может дублировать случайное сообщение, мы вызываем его по крайней мере один раз
  • Но если он всегда доставляет все сообщения без дублирования, то это ровно один раз.

Первоначально Kafka поддерживала только доставку сообщений «не более одного раза» и «не менее одного раза».

Однако введение транзакций между брокерами Kafka и клиентскими приложениями обеспечивает однократную доставку в Kafka . Чтобы лучше понять это, давайте быстро рассмотрим транзакционный клиентский API.

3. Зависимости Maven

Для работы с API транзакций нам понадобится Java-клиент Kafka в нашем pom:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

4. Транзакционный цикл « потребление-преобразование-производство ».

В нашем примере мы собираемся использовать сообщения из входной темы, предложения .

Затем для каждого предложения мы посчитаем каждое слово и отправим подсчет отдельных слов в выходную тему counts .

В этом примере мы предположим, что в теме предложений уже есть данные о транзакциях.

4.1. Транзакционно-осведомленный продюсер

Итак, давайте сначала добавим типичного производителя Kafka.

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");

Кроме того, нам нужно указать transactional.id и включить idempotence :

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");

KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Поскольку мы включили идемпотентность, Kafka будет использовать этот идентификатор транзакции как часть своего алгоритма для дедупликации любого сообщения , отправляемого этим производителем , обеспечивая идемпотентность.

Проще говоря, если производитель случайно отправляет одно и то же сообщение в Kafka более одного раза, эти настройки позволяют ему это заметить.

Все, что нам нужно сделать, это убедиться, что идентификатор транзакции уникален для каждого производителя , хотя и неизменен при перезапусках.

4.2. Включение производителя для транзакций

Когда мы будем готовы, нам также нужно вызвать initTransaction , чтобы подготовить производителя к использованию транзакций:

producer.initTransactions();

Это регистрирует производителя у брокера как производителя, который может использовать транзакции, идентифицируя его по его transactional.id и порядковому номеру, или эпохе . В свою очередь, брокер будет использовать их для записи любых действий в журнал транзакций.

И, следовательно, брокер удалит из этого журнала любые действия, принадлежащие производителю с тем же идентификатором транзакции и более ранней эпохой, предполагая, что они относятся к несуществующим транзакциям.

4.3. Потребитель, осведомленный о транзакциях

Когда мы потребляем, мы можем прочитать все сообщения в разделе темы по порядку. Тем не менее, мы можем указать с помощьюisolation.level , что мы должны подождать, чтобы прочитать транзакционные сообщения, пока соответствующая транзакция не будет зафиксирована :

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group-id");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(singleton(“sentences”));

Использование значения read_committed гарантирует, что мы не прочитаем никакие транзакционные сообщения до завершения транзакции.

По умолчанию для параметраisolation.level установлено значение read_uncommitted .

4.4. Потребление и преобразование посредством транзакции

Теперь, когда у нас есть и производитель, и потребитель, настроенные на транзакционную запись и чтение, мы можем потреблять записи из нашей входной темы и подсчитывать каждое слово в каждой записи:

ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap =
records.records(new TopicPartition("input", 0))
.stream()
.flatMap(record -> Stream.of(record.value().split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple ->
tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Обратите внимание, что в приведенном выше коде нет ничего транзакционного. Но, поскольку мы использовали read_committed, это означает, что никакие сообщения, которые были записаны во входную тему в той же транзакции, не будут прочитаны этим потребителем, пока они все не будут записаны.

Теперь мы можем отправить рассчитанное количество слов в выходную тему.

Давайте посмотрим, как мы можем получить наши результаты, также транзакционно.

4.5. Отправить API

Чтобы отправить наши счетчики как новые сообщения, но в той же транзакции, мы вызываем beginTransaction :

producer.beginTransaction();

Затем мы можем записать каждый из них в нашу тему «counts», где ключ — это слово, а count — это значение:

wordCountMap.forEach((key,value) -> 
producer.send(new ProducerRecord<String,String>("counts",key,value.toString())));

Обратите внимание: поскольку производитель может разбивать данные по ключу, это означает, что транзакционные сообщения могут охватывать несколько разделов, каждый из которых читается отдельными потребителями. Поэтому брокер Kafka будет хранить список всех обновленных разделов для транзакции.

Также обратите внимание, что в транзакции производитель может использовать несколько потоков для параллельной отправки записей .

4.6. Совершение зачетов

И, наконец, нам нужно зафиксировать наши смещения, которые мы только что закончили потреблять. С транзакциями мы фиксируем смещения обратно во входную тему, из которой мы их считываем, как обычно. Также мы отправляем их в транзакцию производителя.

Мы можем сделать все это за один вызов, но сначала нам нужно вычислить смещения для каждого раздела темы:

Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}

Обратите внимание, что то, что мы фиксируем в транзакции, — это предстоящее смещение, то есть нам нужно добавить 1.

Затем мы можем отправить наши рассчитанные смещения в транзакцию:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Совершение или отмена транзакции

И, наконец, мы можем зафиксировать транзакцию, которая атомарно запишет смещения в топик Consumer_offsets , а также в саму транзакцию:

producer.commitTransaction();

Это сбрасывает любое буферизованное сообщение в соответствующие разделы. Кроме того, брокер Kafka делает все сообщения в этой транзакции доступными для потребителей.

Конечно, если во время обработки что-то пойдет не так, например, если мы поймаем исключение, мы можем вызвать abortTransaction:

try {
// ... read from input topic
// ... transform
// ... write to output topic
producer.commitTransaction();
} catch ( Exception e ) {
producer.abortTransaction();
}

И удалите все буферизованные сообщения и удалите транзакцию из брокера.

Если мы не зафиксируем и не прервем транзакцию до настроенного брокером max.transaction.timeout.ms, брокер Kafka прервет транзакцию сам. Значение по умолчанию для этого свойства — 900 000 миллисекунд или 15 минут.

5. Другие циклы « потребление-преобразование-производство »

То, что мы только что видели, — это базовый цикл потребления-преобразования-производства , который читает и записывает в один и тот же кластер Kafka.

И наоборот, приложения, которые должны читать и записывать в разные кластеры Kafka, должны использовать более старые API commitSync и commitAsync . Как правило, приложения сохраняют смещения потребителей во внешнем хранилище состояний для поддержания транзакционности.

6. Заключение

Для приложений, критически важных для данных, сквозная однократная обработка часто является обязательной.

В этом руководстве мы увидели, как мы используем Kafka, чтобы сделать именно это, используя транзакции , и мы реализовали пример подсчета слов на основе транзакций, чтобы проиллюстрировать принцип.

Не стесняйтесь проверить все примеры кода на GitHub .