Перейти к основному содержимому

Создание конвейера данных с помощью Flink и Kafka

· 8 мин. чтения

1. Обзор

Apache Flink — это платформа обработки потоков, которую можно легко использовать с Java. Apache Kafka — это распределенная система обработки потоков, поддерживающая высокую отказоустойчивость.

В этом руководстве мы рассмотрим, как построить конвейер данных с использованием этих двух технологий.

2. Установка

Чтобы установить и настроить Apache Kafka, обратитесь к официальному руководству . После установки мы можем использовать следующие команды для создания новых тем с именами flink_input и flink_output:

bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_output

bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_input

Для этого руководства мы будем использовать конфигурацию по умолчанию и порты по умолчанию для Apache Kafka.

3. Использование флинка

Apache Flink позволяет использовать технологию потоковой обработки в реальном времени. Фреймворк позволяет использовать несколько сторонних систем в качестве источников или приемников потоков .

Во Flink доступны различные коннекторы:

  • Apache Kafka (источник/приемник)
  • Апач Кассандра (раковина)
  • Amazon Kinesis Streams (источник/приемник)
  • Elasticsearch (раковина)
  • Файловая система Hadoop (приемник)
  • RabbitMQ (источник/приемник)
  • Apache NiFi (источник/приемник)
  • API потоковой передачи Twitter (источник)

Чтобы добавить Flink в наш проект, нам нужно включить следующие зависимости Maven:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>

Добавление этих зависимостей позволит нам потреблять и производить темы Kafka и из них. Вы можете найти текущую версию Flink на Maven Central .

4. Потребитель строк Kafka

Чтобы использовать данные из Kafka с помощью Flink, нам нужно указать тему и адрес Kafka. Мы также должны указать идентификатор группы, который будет использоваться для хранения смещений, поэтому мы не всегда будем читать все данные с самого начала.

Давайте создадим статический метод, который упростит создание FlinkKafkaConsumer :

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
topic, new SimpleStringSchema(), props);

return consumer;
}

Этот метод принимает тему, kafkaAddress и kafkaGroup и создает FlinkKafkaConsumer , который будет использовать данные из данной темы в виде строки , поскольку мы использовали SimpleStringSchema для декодирования данных.

Число 011 в названии класса относится к версии Kafka.

5. Кафка, продюсер струнных инструментов

Чтобы передать данные в Kafka, нам нужно указать адрес Kafka и тему, которую мы хотим использовать. Опять же, мы можем создать статический метод, который поможет нам создавать производителей для разных тем:

public static FlinkKafkaProducer011<String> createStringProducer(
String topic, String kafkaAddress){

return new FlinkKafkaProducer011<>(kafkaAddress,
topic, new SimpleStringSchema());
}

Этот метод принимает в качестве аргументов только тему и адрес kafkaAddress , поскольку нет необходимости указывать идентификатор группы, когда мы создаем тему для Kafka.

6. Обработка строкового потока

Когда у нас есть полностью работающие потребитель и производитель, мы можем попытаться обработать данные из Kafka, а затем сохранить наши результаты обратно в Kafka. Полный список функций, которые можно использовать для потоковой обработки, можно посмотреть здесь .

В этом примере мы будем писать слова в каждой записи Kafka с заглавной буквы, а затем записывать их обратно в Kafka.

Для этого нам нужно создать пользовательскую MapFunction :

public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}

После создания функции мы можем использовать ее в потоковой обработке:

public static void capitalize() {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "foreach";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment
.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
DataStream<String> stringInputStream = environment
.addSource(flinkKafkaConsumer);

FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
outputTopic, address);

stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
}

Приложение будет считывать данные из топика flink_input , выполнять операции над потоком, а затем сохранять результаты в топике flink_output в Kafka.

Мы видели, как работать со строками с помощью Flink и Kafka. Но часто требуется выполнять операции над пользовательскими объектами. Мы увидим, как это сделать, в следующих главах.

7. Десериализация пользовательских объектов

Следующий класс представляет простое сообщение с информацией об отправителе и получателе:

@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
}

Раньше мы использовали SimpleStringSchema для десериализации сообщений от Kafka, но теперь мы хотим десериализовать данные напрямую в пользовательские объекты .

Для этого нам нужна пользовательская DeserializationSchema:

public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {

static ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());

@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, InputMessage.class);
}

@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}

@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}

Здесь мы предполагаем, что сообщения хранятся в формате JSON в Kafka.

Поскольку у нас есть поле типа LocalDateTime , нам нужно указать JavaTimeModule, который заботится о сопоставлении объектов LocalDateTime с JSON.

Схемы Flink не могут иметь несериализуемых полей, поскольку все операторы (например, схемы или функции) сериализуются в начале задания.

Аналогичные проблемы есть и в Apache Spark. Одним из известных исправлений этой проблемы является инициализация полей как статических , как мы сделали с ObjectMapper выше. Это не самое красивое решение, но оно относительно простое и выполняет свою работу.

Метод isEndOfStream можно использовать для особого случая, когда поток должен обрабатываться только до тех пор, пока не будут получены определенные данные. Но в нашем случае это не нужно.

8. Пользовательская сериализация объектов

Теперь предположим, что мы хотим, чтобы наша система имела возможность создавать резервную копию сообщений. Мы хотим, чтобы процесс был автоматическим, и каждая резервная копия должна состоять из сообщений, отправленных в течение целого дня.

Кроме того, резервному сообщению должен быть назначен уникальный идентификатор.

Для этого мы можем создать следующий класс:

public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;

public Backup(List<InputMessage> inputMessages,
LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
}

Обратите внимание, что механизм генерации UUID не идеален, так как допускает дублирование. Однако этого достаточно для объема данного примера.

Мы хотим сохранить наш объект Backup как JSON в Kafka, поэтому нам нужно создать нашу SerializationSchema :

public class BackupSerializationSchema
implements SerializationSchema<Backup> {

ObjectMapper objectMapper;
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);

@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
}
try {
return objectMapper.writeValueAsString(backupMessage).getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}

9. Сообщения с метками времени

Поскольку мы хотим создать резервную копию всех сообщений за каждый день, сообщениям нужна отметка времени.

Flink предоставляет три различных временных характеристики EventTime, ProcessingTime и IngestionTime.

В нашем случае нам нужно использовать время отправки сообщения, поэтому мы будем использовать EventTime.

Чтобы использовать EventTime , нам нужен TimestampAssigner , который будет извлекать метки времени из наших входных данных :

public class InputMessageTimestampAssigner 
implements AssignerWithPunctuatedWatermarks<InputMessage> {

@Override
public long extractTimestamp(InputMessage element,
long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}

@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement,
long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1500);
}
}

Нам нужно преобразовать LocalDateTime в EpochSecond , так как это формат, ожидаемый Flink. После назначения временных меток все операции, основанные на времени, будут использовать время из поля sentAt для работы.

Поскольку Flink ожидает, что метки времени будут в миллисекундах, а toEpochSecond() возвращает время в секундах, нам нужно было умножить его на 1000, чтобы Flink правильно создавал окна.

Флинк определяет концепцию водяного знака. Водяные знаки полезны в случае, если данные не приходят в том порядке, в котором они были отправлены. Водяной знак определяет максимально допустимую задержку для обработки элементов.

Элементы с отметками времени ниже водяного знака вообще не будут обрабатываться.

10. Создание временных окон

Чтобы убедиться, что наша резервная копия собирает только сообщения, отправленные в течение одного дня, мы можем использовать метод timeWindowAll в потоке, который разделит сообщения на окна.

Однако нам по-прежнему нужно будет собирать сообщения из каждого окна и возвращать их как резервные копии .

Для этого нам понадобится пользовательская AggregateFunction :

public class BackupAggregator 
implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {

@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}

@Override
public List<InputMessage> add(
InputMessage inputMessage,
List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}

@Override
public Backup getResult(List<InputMessage> inputMessages) {
return new Backup(inputMessages, LocalDateTime.now());
}

@Override
public List<InputMessage> merge(List<InputMessage> inputMessages,
List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

11. Объединение резервных копий

После назначения правильных временных меток и реализации нашей AggregateFunction мы, наконец, можем взять наш ввод Kafka и обработать его:

public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "foreach";
String kafkaAddress = "192.168.99.100:9092";
StreamExecutionEnvironment environment
= StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
= createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();

flinkKafkaConsumer.assignTimestampsAndWatermarks(
new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer
= createBackupProducer(outputTopic, kafkaAddress);

DataStream<InputMessage> inputMessagesStream
= environment.addSource(flinkKafkaConsumer);

inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);

environment.execute();
}

12. Заключение

В этой статье мы представили, как создать простой конвейер данных с помощью Apache Flink и Apache Kafka.

Как всегда, код можно найти на Github .