1. Обзор
Apache Flink — это платформа обработки потоков, которую можно легко использовать с Java. Apache Kafka — это распределенная система обработки потоков, поддерживающая высокую отказоустойчивость.
В этом руководстве мы рассмотрим, как построить конвейер данных с использованием этих двух технологий.
2. Установка
Чтобы установить и настроить Apache Kafka, обратитесь к официальному руководству . После установки мы можем использовать следующие команды для создания новых тем с именами flink_input
и flink_output:
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_output
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_input
Для этого руководства мы будем использовать конфигурацию по умолчанию и порты по умолчанию для Apache Kafka.
3. Использование флинка
Apache Flink позволяет использовать технологию потоковой обработки в реальном времени. Фреймворк позволяет использовать несколько сторонних систем в качестве источников или приемников потоков .
Во Flink доступны различные коннекторы:
- Apache Kafka (источник/приемник)
- Апач Кассандра (раковина)
- Amazon Kinesis Streams (источник/приемник)
- Elasticsearch (раковина)
- Файловая система Hadoop (приемник)
- RabbitMQ (источник/приемник)
- Apache NiFi (источник/приемник)
- API потоковой передачи Twitter (источник)
Чтобы добавить Flink в наш проект, нам нужно включить следующие зависимости Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>
Добавление этих зависимостей позволит нам потреблять и производить темы Kafka и из них. Вы можете найти текущую версию Flink на Maven Central .
4. Потребитель строк Kafka
Чтобы использовать данные из Kafka с помощью Flink, нам нужно указать тему и адрес Kafka. Мы также должны указать идентификатор группы, который будет использоваться для хранения смещений, поэтому мы не всегда будем читать все данные с самого начала.
Давайте создадим статический метод, который упростит создание FlinkKafkaConsumer
:
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
topic, new SimpleStringSchema(), props);
return consumer;
}
Этот метод принимает тему, kafkaAddress
и kafkaGroup
и создает FlinkKafkaConsumer
, который будет использовать данные из данной темы в виде строки
, поскольку мы использовали SimpleStringSchema
для декодирования данных.
Число 011
в названии класса относится к версии Kafka.
5. Кафка, продюсер струнных инструментов
Чтобы передать данные в Kafka, нам нужно указать адрес Kafka и тему, которую мы хотим использовать. Опять же, мы можем создать статический метод, который поможет нам создавать производителей для разных тем:
public static FlinkKafkaProducer011<String> createStringProducer(
String topic, String kafkaAddress){
return new FlinkKafkaProducer011<>(kafkaAddress,
topic, new SimpleStringSchema());
}
Этот метод принимает в качестве аргументов только тему
и адрес kafkaAddress
, поскольку нет необходимости указывать идентификатор группы, когда мы создаем тему для Kafka.
6. Обработка строкового потока
Когда у нас есть полностью работающие потребитель и производитель, мы можем попытаться обработать данные из Kafka, а затем сохранить наши результаты обратно в Kafka. Полный список функций, которые можно использовать для потоковой обработки, можно посмотреть здесь .
В этом примере мы будем писать слова в каждой записи Kafka с заглавной буквы, а затем записывать их обратно в Kafka.
Для этого нам нужно создать пользовательскую MapFunction
:
public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}
После создания функции мы можем использовать ее в потоковой обработке:
public static void capitalize() {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "foreach";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment
.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
DataStream<String> stringInputStream = environment
.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
}
Приложение будет считывать данные из топика flink_input
, выполнять операции над потоком, а затем сохранять результаты в топике flink_output
в Kafka.
Мы видели, как работать со строками с помощью Flink и Kafka. Но часто требуется выполнять операции над пользовательскими объектами. Мы увидим, как это сделать, в следующих главах.
7. Десериализация пользовательских объектов
Следующий класс представляет простое сообщение с информацией об отправителе и получателе:
@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
}
Раньше мы использовали SimpleStringSchema
для десериализации сообщений от Kafka, но теперь мы хотим десериализовать данные напрямую в пользовательские объекты .
Для этого нам нужна пользовательская DeserializationSchema:
public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, InputMessage.class);
}
@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}
@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}
Здесь мы предполагаем, что сообщения хранятся в формате JSON в Kafka.
Поскольку у нас есть поле типа LocalDateTime
, нам нужно указать JavaTimeModule,
который заботится о сопоставлении объектов LocalDateTime
с JSON.
Схемы Flink не могут иметь несериализуемых полей, поскольку все операторы (например, схемы или функции) сериализуются в начале задания.
Аналогичные проблемы есть и в Apache Spark. Одним из известных исправлений этой проблемы является инициализация полей как статических
, как мы сделали с ObjectMapper
выше. Это не самое красивое решение, но оно относительно простое и выполняет свою работу.
Метод isEndOfStream
можно использовать для особого случая, когда поток должен обрабатываться только до тех пор, пока не будут получены определенные данные. Но в нашем случае это не нужно.
8. Пользовательская сериализация объектов
Теперь предположим, что мы хотим, чтобы наша система имела возможность создавать резервную копию сообщений. Мы хотим, чтобы процесс был автоматическим, и каждая резервная копия должна состоять из сообщений, отправленных в течение целого дня.
Кроме того, резервному сообщению должен быть назначен уникальный идентификатор.
Для этого мы можем создать следующий класс:
public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;
public Backup(List<InputMessage> inputMessages,
LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
}
Обратите внимание, что механизм генерации UUID не идеален, так как допускает дублирование. Однако этого достаточно для объема данного примера.
Мы хотим сохранить наш объект Backup
как JSON в Kafka, поэтому нам нужно создать нашу SerializationSchema
:
public class BackupSerializationSchema
implements SerializationSchema<Backup> {
ObjectMapper objectMapper;
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
}
try {
return objectMapper.writeValueAsString(backupMessage).getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}
9. Сообщения с метками времени
Поскольку мы хотим создать резервную копию всех сообщений за каждый день, сообщениям нужна отметка времени.
Flink предоставляет три различных временных характеристики EventTime, ProcessingTime
и IngestionTime.
В нашем случае нам нужно использовать время отправки сообщения, поэтому мы будем использовать EventTime.
Чтобы использовать EventTime
, нам нужен TimestampAssigner
, который будет извлекать метки времени из наших входных данных :
public class InputMessageTimestampAssigner
implements AssignerWithPunctuatedWatermarks<InputMessage> {
@Override
public long extractTimestamp(InputMessage element,
long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement,
long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1500);
}
}
Нам нужно преобразовать LocalDateTime
в EpochSecond
, так как это формат, ожидаемый Flink. После назначения временных меток все операции, основанные на времени, будут использовать время из поля sentAt
для работы.
Поскольку Flink ожидает, что метки времени будут в миллисекундах, а toEpochSecond()
возвращает время в секундах, нам нужно было умножить его на 1000, чтобы Flink правильно создавал окна.
Флинк определяет концепцию водяного знака.
Водяные знаки полезны в случае, если данные не приходят в том порядке, в котором они были отправлены. Водяной знак определяет максимально допустимую задержку для обработки элементов.
Элементы с отметками времени ниже водяного знака вообще не будут обрабатываться.
10. Создание временных окон
Чтобы убедиться, что наша резервная копия собирает только сообщения, отправленные в течение одного дня, мы можем использовать метод timeWindowAll
в потоке, который разделит сообщения на окна.
Однако нам по-прежнему нужно будет собирать сообщения из каждого окна и возвращать их как резервные копии
.
Для этого нам понадобится пользовательская AggregateFunction
:
public class BackupAggregator
implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(
InputMessage inputMessage,
List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
return new Backup(inputMessages, LocalDateTime.now());
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages,
List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}
11. Объединение резервных копий
После назначения правильных временных меток и реализации нашей AggregateFunction
мы, наконец, можем взять наш ввод Kafka и обработать его:
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "foreach";
String kafkaAddress = "192.168.99.100:9092";
StreamExecutionEnvironment environment
= StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
= createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer
= createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream
= environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
12. Заключение
В этой статье мы представили, как создать простой конвейер данных с помощью Apache Flink и Apache Kafka.
Как всегда, код можно найти на Github .