Перейти к основному содержимому

ETL с Spring Cloud Data Flow

· 12 мин. чтения

Задача: Наибольшая подстрока палиндром

Для заданной строки s, верните наибольшую подстроку палиндром входящую в s. Подстрока — это непрерывная непустая последовательность символов внутри строки. Стока является палиндромом, если она читается одинаково в обоих направлениях...

ANDROMEDA 42

1. Обзор

Spring Cloud Data Flow — это облачный набор инструментов для создания конвейеров данных в реальном времени и пакетных процессов. Spring Cloud Data Flow готов к использованию в ряде случаев обработки данных, таких как простой импорт/экспорт, обработка ETL, потоковая передача событий и прогнозная аналитика.

В этом руководстве мы рассмотрим пример извлечения, преобразования и загрузки (ETL) в реальном времени с использованием потокового конвейера, который извлекает данные из базы данных JDBC, преобразует их в простые POJO и загружает их в MongoDB.

2. ETL и обработка событийного потока

ETL (извлечение, преобразование и загрузка) обычно называют процессом пакетной загрузки данных из нескольких баз данных и систем в общее хранилище данных. В этом хранилище данных можно выполнять интенсивную обработку данных без ущерба для общей производительности системы.

Однако новые тенденции меняют то, как это делается. ETL по-прежнему играет роль в передаче данных в хранилища данных и озера данных.

В настоящее время это можно сделать с потоками в архитектуре событийного потока с помощью Spring Cloud Data Flow .

3. Весенний облачный поток данных

С помощью Spring Cloud Data Flow (SCDF) разработчики могут создавать конвейеры данных двух видов:

  • Долгоживущие потоковые приложения в реальном времени с использованием Spring Cloud Stream
  • Приложения с кратковременными пакетными задачами, использующие Spring Cloud Task

В этой статье мы рассмотрим первое долгоживущее потоковое приложение на основе Spring Cloud Stream.

3.1. Приложения Spring Cloud Stream

Конвейеры SCDF Stream состоят из шагов, где каждый шаг представляет собой приложение, созданное в стиле Spring Boot с использованием микрофреймворка Spring Cloud Stream. Эти приложения интегрируются промежуточным программным обеспечением для обмена сообщениями, таким как Apache Kafka или RabbitMQ.

Эти приложения подразделяются на источники, процессоры и приемники. По сравнению с процессом ETL можно сказать, что источник — это «извлечение», процессор — это «преобразователь», а приемник — это «загрузочная» часть.

В некоторых случаях мы можем использовать стартер приложения на одном или нескольких этапах конвейера. Это означает, что нам не нужно реализовывать новое приложение для шага, а вместо этого настраивать уже реализованный стартер существующего приложения.

Список стартеров приложений можно найти здесь .

3.2. Сервер потока данных Spring Cloud

Последний элемент архитектуры — Spring Cloud Data Flow Server . Сервер SCDF выполняет развертывание приложений и конвейерного потока с использованием спецификации Spring Cloud Deployer. Эта спецификация поддерживает облачный вариант SCDF за счет развертывания в ряде современных сред выполнения, таких как Kubernetes, Apache Mesos, Yarn и Cloud Foundry.

Кроме того, мы можем запустить поток как локальное развертывание.

Более подробную информацию об архитектуре SCDF можно найти здесь .

4. Настройка среды

Прежде чем мы начнем, нам нужно выбрать части этого сложного развертывания . Первым элементом, который необходимо определить, является сервер SCDF.

Для тестирования мы будем использовать SCDF Server Local для локальной разработки . Для производственного развертывания позже мы можем выбрать облачную среду выполнения, например SCDF Server Kubernetes . Мы можем найти список сред выполнения сервера здесь .

Теперь давайте проверим системные требования для запуска этого сервера.

4.1. Системные Требования

Чтобы запустить сервер SCDF, нам нужно определить и настроить две зависимости:

  • промежуточное ПО для обмена сообщениями и
  • РСУБД.

Что касается промежуточного программного обеспечения для обмена сообщениями, мы будем работать с RabbitMQ и выберем PostgreSQL в качестве СУБД для хранения определений наших конвейерных потоков.

Для запуска RabbitMQ загрузите последнюю версию здесь и запустите экземпляр RabbitMQ, используя конфигурацию по умолчанию, или выполните следующую команду Docker:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

В качестве последнего шага настройки установите и запустите СУБД PostgreSQL на порту по умолчанию 5432. После этого создайте базу данных, в которой SCDF может хранить определения своих потоков, используя следующий сценарий:

CREATE DATABASE dataflow;

4.2. Локальный сервер Spring Cloud Data Flow

Для запуска локального сервера SCDF мы можем запустить сервер с помощью docker-compose или запустить его как приложение Java.

Здесь мы запустим локальный сервер SCDF как приложение Java. Для настройки приложения мы должны определить конфигурацию как параметры приложения Java. Нам понадобится Java 8 в системном пути.

Чтобы разместить файлы jar и зависимости, нам нужно создать домашнюю папку для нашего сервера SCDF и загрузить в эту папку локальный дистрибутив сервера SCDF. Скачать самый последний дистрибутив SCDF Server Local можно здесь .

Также нам нужно создать папку lib и поместить туда драйвер JDBC. Последняя версия драйвера PostgreSQL доступна здесь .

Наконец, давайте запустим локальный сервер SCDF:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres_username \
--spring.datasource.password=postgres_password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest

Мы можем проверить, работает ли он, посмотрев на этот URL:

http://локальный:9393/приборная панель

4.3. Оболочка потока данных Spring Cloud

SCDF Shell — это инструмент командной строки, который упрощает создание и развертывание наших приложений и конвейеров . Эти команды оболочки выполняются через REST API Spring Cloud Data Flow Server .

Загрузите последнюю версию jar-файла в свою домашнюю папку SCDF, доступную здесь. Как только это будет сделано, выполните следующую команду (при необходимости обновите версию):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
____ ____ _ __
/ ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |
\___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |
___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |
|____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|
____ |_| _ __|___/ __________
| _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \
| | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \
| |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / /
|____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

Если вместо « dataflow:> » в последней строке вы получите « server-unknown:>» , вы не используете сервер SCDF на локальном хосте. В этом случае выполните следующую команду для подключения к другому хосту:

server-unknown:>dataflow config server http://{host}

Теперь Shell подключена к серверу SCDF, и мы можем выполнять наши команды.

Первое, что нам нужно сделать в Shell, — это импортировать стартеры приложений. Найдите здесь последнюю версию RabbitMQ+Maven в Spring Boot 2.0.x и выполните следующую команду (снова обновите версию, здесь « Darwin-SR1 », по мере необходимости):

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Для проверки установленных приложений выполните следующую команду оболочки:

$ dataflow:> app list

В результате мы должны увидеть таблицу со всеми установленными приложениями.

Кроме того, SCDF предлагает графический интерфейс с именем Flo , к которому мы можем получить доступ по этому адресу: http://localhost:9393/dashboard . Однако его использование выходит за рамки этой статьи.

5. Составление конвейера ETL

Давайте теперь создадим наш потоковый конвейер. Для этого мы будем использовать стартер приложения JDBC Source для извлечения информации из нашей реляционной базы данных.

Кроме того, мы создадим собственный процессор для преобразования информационной структуры и собственный приемник для загрузки наших данных в MongoDB.

5.1. Извлечение — подготовка реляционной базы данных к извлечению

Давайте создадим базу данных с именем crm и таблицу с именем клиента :

CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer_name character varying(50),
PRIMARY KEY(id)
)

Обратите внимание, что мы используем флаг import , который будет указывать, какая запись уже была импортирована. Мы также можем хранить эту информацию в другой таблице, если это необходимо.

Теперь давайте вставим некоторые данные:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. Преобразование — сопоставление полей JDBC со структурой полей MongoDB

На этапе преобразования мы выполним простой перевод поля customer_name из исходной таблицы в новое имя поля . Здесь можно сделать и другие преобразования, но давайте не будем сокращать пример.

Для этого мы создадим новый проект с именем customer-transform . Самый простой способ сделать это — использовать сайт Spring Initializr для создания проекта. После перехода на веб-сайт выберите группу и имя артефакта. Мы будем использовать com.customer и customer-transform соответственно.

Как только это будет сделано, нажмите кнопку «Создать проект», чтобы загрузить проект. Затем разархивируйте проект и импортируйте его в свою любимую IDE, а также добавьте следующую зависимость в pom.xml :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Теперь мы готовы начать программировать преобразование имени поля. Для этого мы создадим класс Customer , который будет действовать как адаптер. Этот класс получит имя клиента с помощью метода setName() и выведет его значение с помощью метода getName .

Аннотации @JsonProperty будут выполнять преобразование при десериализации из JSON в Java:

public class Customer {

private Long id;

private String name;

@JsonProperty("customer_name")
public void setName(String name) {
this.name = name;
}

@JsonProperty("name")
public String getName() {
return name;
}

// Getters and Setters
}

Процессор должен получить данные от входа, выполнить преобразование и связать результат с выходным каналом. Давайте создадим класс для этого:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;

@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {

return payload;
}
}

В приведенном выше коде мы можем заметить, что преобразование происходит автоматически. На вход поступают данные в формате JSON, и Джексон десериализует их в объект Customer , используя заданные методы.

Для вывода наоборот, данные сериализуются в JSON с использованием методов get .

5.3. Загрузка — приемник в MongoDB

Аналогично шагу преобразования мы создадим еще один проект maven, теперь с именем customer -mongodb- sink . Снова откройте Spring Initializr , для Group выберите com.customer , а для Artifact выберите customer-mongodb-sink . Затем введите « MongoDB » в поле поиска зависимостей и загрузите проект.

Затем разархивируйте и импортируйте его в свою любимую IDE.

Затем добавьте ту же дополнительную зависимость, что и в проекте преобразования клиента .

Теперь мы создадим еще один класс Customer для получения входных данных на этом этапе:

import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="customer")
public class Customer {

private Long id;
private String name;

// Getters and Setters
}

Для погружения Customer мы создадим класс Listener, который сохранит сущность клиента с помощью CustomerRepository :

@EnableBinding(Sink.class)
public class CustomerListener {

@Autowired
private CustomerRepository repository;

@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}

А CustomerRepository в данном случае — это MongoRepository из Spring Data:

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {

}

5.4. Определение потока

Теперь оба пользовательских приложения готовы к регистрации на сервере SCDF. Для этого скомпилируйте оба проекта с помощью команды Maven mvn install .

Затем мы регистрируем их с помощью Spring Cloud Data Flow Shell:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

Наконец, давайте проверим, хранятся ли приложения в SCDF, запустив команду списка приложений в оболочке:

app list

В результате мы должны увидеть оба приложения в результирующей таблице.

5.4.1. Доменный язык потокового конвейера — DSL

DSL определяет конфигурацию и поток данных между приложениями. SCDF DSL прост. В первом слове мы определяем имя приложения, за которым следуют конфигурации.

Кроме того, синтаксис представляет собой синтаксис Pipeline , вдохновленный Unix , который использует вертикальные полосы, также известные как «каналы», для соединения нескольких приложений:

http --port=8181 | log

Это создает HTTP-приложение, обслуживаемое через порт 8181, которое отправляет любую полученную полезную нагрузку в журнал.

Теперь давайте посмотрим, как создать определение потока DSL источника JDBC.

5.4.2. Определение исходного потока JDBC

Ключевыми конфигурациями источника JDBC являются запрос и обновление . query выберет непрочитанные записи, а update изменит флаг, чтобы предотвратить повторное чтение текущих записей.

Кроме того, мы определим источник JDBC для опроса с фиксированной задержкой в 30 секунд и опроса максимум 1000 строк. Наконец, мы определим конфигурации подключения, такие как драйвер, имя пользователя, пароль и URL-адрес подключения:

jdbc 
--query='SELECT id, customer_name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres

Дополнительные свойства конфигурации JDBC Source можно найти здесь .

5.4.3. Определение стокового потока клиента MongoDB

Поскольку мы не определяли конфигурации подключения в application.properties клиента -mongodb-sink , мы настроим их с помощью параметров DSL.

Наше приложение полностью основано на MongoDataAutoConfiguration. Вы можете проверить другие возможные конфигурации здесь. По сути, мы определим spring.data.mongodb.uri :

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. Создание и развертывание потока

Во-первых, чтобы создать окончательное определение потока, вернитесь в оболочку и выполните следующую команду (без разрывов строк, они просто вставлены для удобства чтения):

stream create --name jdbc-to-mongodb 
--definition "jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"

Этот поток DSL определяет поток с именем jdbc -to- mongodb . Далее мы развернем поток по его имени :

stream deploy --name jdbc-to-mongodb

Наконец, мы должны увидеть расположение всех доступных журналов в выводе журнала:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Заключение

В этой статье мы рассмотрели полный пример конвейера данных ETL с использованием Spring Cloud Data Flow.

Наиболее примечательно то, что мы увидели конфигурации запуска приложения, создали потоковый конвейер ETL с использованием Spring Cloud Data Flow Shell и внедрили пользовательские приложения для чтения, преобразования и записи данных.

Как всегда, пример кода можно найти в проекте GitHub.