1. Введение
Spring Cloud Data Flow
— это облачная модель программирования и эксплуатации для компонуемых микросервисов данных.
С помощью Spring Cloud Data Flow
разработчики могут создавать и организовывать конвейеры данных для распространенных случаев использования, таких как прием данных, аналитика в реальном времени и импорт/экспорт данных.
Эти конвейеры данных бывают двух видов: конвейеры потоковых и пакетных данных.
В первом случае неограниченное количество данных потребляется или создается с помощью промежуточного ПО для обмена сообщениями. В то время как во втором случае кратковременная задача обрабатывает конечный набор данных и затем завершается.
В этой статье речь пойдет о потоковой обработке.
2. Архитектурный обзор
Ключевыми компонентами архитектуры этого типа являются приложения
, сервер потока данных
и целевая среда выполнения.
Кроме того, в дополнение к этим ключевым компонентам у нас также обычно есть оболочка потока данных
и брокер сообщений
в архитектуре.
Давайте рассмотрим все эти компоненты более подробно.
2.1. Приложения
Как правило, конвейер потоковой передачи данных включает в себя потребление событий из внешних систем, обработку данных и постоянство полиглотов. Эти фазы обычно называются Source
, Processor
и Sink
в терминологии Spring Cloud :
- Источник: это приложение, которое потребляет события
- Процессор: потребляет данные из
источника
, выполняет некоторую обработку и передает обработанные данные следующему приложению в конвейере. - Приемник: либо потребляет из
источника
, либо изпроцессора
и записывает данные на желаемый уровень сохраняемости.
Эти приложения могут быть упакованы двумя способами:
- Uber-jar Spring Boot, размещенный в репозитории maven, файле, http или любой другой реализации ресурса Spring (этот метод будет использоваться в этой статье)
- Докер
Многие источники, процессоры и приложения-приемники для распространенных вариантов использования (например, jdbc, hdfs, http, router) уже предоставлены и готовы к использованию командой Spring Cloud Data Flow
.
2.2. Время выполнения
Кроме того, для выполнения этих приложений требуется среда выполнения. Поддерживаемые среды выполнения:
- Облачная Литейная
- Апачская пряжа
- Кубернетес
- Апач Месос
- Локальный сервер для разработки (который будет использоваться в этой статье)
2.3. Сервер потока данных
Компонентом, отвечающим за развертывание приложений в среде выполнения, является Data Flow Server
. Для каждой из целевых сред выполнения предусмотрен исполняемый файл JAR Data Flow Server .
Data Flow Server
отвечает за интерпретацию:
- Потоковый DSL, описывающий логический поток данных через несколько приложений.
- Манифест развертывания, описывающий сопоставление приложений со средой выполнения.
2.4. Оболочка потока данных
Data Flow Shell — это клиент для Data Flow Server. Оболочка позволяет нам выполнять команду DSL, необходимую для взаимодействия с сервером.
Например, DSL для описания потока данных от источника http к приемнику jdbc будет записан как «http | jdbc». Эти имена в DSL регистрируются на сервере потока данных
и сопоставляются с артефактами приложений, которые могут размещаться в репозиториях Maven или Docker.
Spring также предлагает графический интерфейс Flo
для создания и мониторинга конвейеров потоковой передачи данных. Однако его использование выходит за рамки обсуждения этой статьи.
2.5. Брокер сообщений
Как мы видели в примере из предыдущего раздела, мы использовали символ вертикальной черты в определении потока данных. Символ вертикальной черты представляет связь между двумя приложениями через промежуточное ПО для обмена сообщениями.
Это означает, что нам нужен брокер сообщений , работающий в целевой среде.
Два поддерживаемых брокера промежуточного программного обеспечения для обмена сообщениями:
- Апач Кафка
- RabbitMQ
Итак, теперь, когда у нас есть обзор архитектурных компонентов , пришло время построить наш первый конвейер потоковой обработки.
3. Установите брокер сообщений
Как мы видели, приложениям в конвейере для связи требуется промежуточное ПО для обмена сообщениями. Для целей этой статьи мы будем использовать RabbitMQ
.
Для получения полной информации об установке вы можете следовать инструкции на официальном сайте .
4. Локальный сервер потока данных
Чтобы ускорить процесс создания наших приложений, мы будем использовать Spring Initializr ; с его помощью мы можем получить наши приложения Spring Boot
за несколько минут.
После перехода на веб-сайт просто выберите группу
и имя артефакта
.
Как только это будет сделано, нажмите кнопку « Создать проект»
, чтобы начать загрузку артефакта Maven.
После завершения загрузки разархивируйте проект и импортируйте его как проект Maven в выбранную вами IDE.
Давайте добавим в проект зависимость Maven. Поскольку нам понадобятся библиотеки Dataflow Local Server
, давайте добавим зависимость spring-cloud-starter-dataflow-server-local :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>
Теперь нам нужно аннотировать основной класс Spring Boot аннотацией
@EnableDataFlowServer
:
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
Это все. Наш локальный сервер потока данных
готов к запуску:
mvn spring-boot:run
Приложение загрузится на порту 9393.
5. Оболочка потока данных
Снова перейдите в Spring Initializr и выберите имя группы
и артефакта
.
После того, как мы загрузили и импортировали проект, давайте добавим зависимость spring-cloud-dataflow-shell :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>
Теперь нам нужно добавить аннотацию @EnableDataFlowShell
в основной класс Spring Boot :
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
Теперь мы можем запустить оболочку:
mvn spring-boot:run
После запуска оболочки мы можем ввести команду справки
в приглашении, чтобы увидеть полный список команд, которые мы можем выполнить.
6. Исходное приложение
Точно так же в Initializr мы создадим простое приложение и добавим зависимость Stream Rabbit
с именем spring-cloud-starter-stream-rabbit:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Затем мы добавим аннотацию @EnableBinding(Source.class)
к основному классу Spring Boot :
@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}
Теперь нам нужно определить источник данных, которые необходимо обработать. Этим источником может быть любая потенциально бесконечная рабочая нагрузка (данные датчиков Интернета вещей, круглосуточная обработка событий, прием данных онлайн-транзакций).
В нашем примере приложения мы создаем одно событие (для простоты — новую отметку времени) каждые 10 секунд с помощью Poller
.
Аннотация @InboundChannelAdapter
отправляет сообщение в выходной канал источника, используя возвращаемое значение в качестве полезной нагрузки сообщения:
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}
Наш источник данных готов.
7. Приложение процессора
Далее мы создадим приложение и добавим зависимость Stream Rabbit .
Затем мы добавим аннотацию @EnableBinding(Processor.class)
к основному классу Spring Boot :
@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}
Далее нам нужно определить метод для обработки данных, поступающих из исходного приложения.
Чтобы определить трансформатор, нам нужно аннотировать этот метод аннотацией @Transformer
:
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}
Он преобразует метку времени из «входного» канала в отформатированную дату, которая будет отправлена в «выходной» канал.
8. Приложение Sink
Последним приложением, которое нужно создать, является приложение Sink.
Снова перейдите в Spring Initializr и выберите Group
, имя артефакта
. После загрузки проекта добавим зависимость Stream Rabbit .
Затем добавьте аннотацию @EnableBinding(Sink.class)
к основному классу Spring Boot :
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}
Теперь нам нужен метод для перехвата сообщений, поступающих от процессорного приложения.
Для этого нам нужно добавить аннотацию @StreamListener(Sink.INPUT)
к нашему методу:
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}
Метод просто печатает отметку времени, преобразованную в отформатированную дату, в файл журнала.
9. Зарегистрируйте потоковое приложение
Spring Cloud Data Flow Shell позволяет нам зарегистрировать потоковое приложение в реестре приложений с помощью команды app register
.
Мы должны предоставить уникальное имя, тип приложения и URI, которые могут быть разрешены для артефакта приложения. В качестве типа укажите « источник
», « процессор
» или « приемник
».
При предоставлении URI со схемой maven формат должен соответствовать следующему:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
Чтобы зарегистрировать ранее созданные приложения Source
, Processor
и Sink
, перейдите в оболочку Spring Cloud Data Flow Shell
и выполните следующие команды из командной строки: ``
app register --name time-source --type source
--uri maven://com.foreach.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT
app register --name time-processor --type processor
--uri maven://com.foreach.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT
app register --name logging-sink --type sink
--uri maven://com.foreach.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT
10. Создайте и разверните поток
Чтобы создать новое определение потока, перейдите в Spring Cloud Data Flow Shell
и выполните следующую команду оболочки:
stream create --name time-to-log
--definition 'time-source | time-processor | logging-sink'
Это определяет поток с именем time-to-log
на основе выражения DSL 'time-source | процессор времени | регистрационный сток'
.
Затем для развертывания потока выполните следующую команду оболочки:
stream deploy --name time-to-log
Data Flow Server
преобразует time - source
, time-processor
и logging-sink
в координаты maven и использует их для запуска приложений time-source
, time-processor
и logging-sink
потока.
Если поток развернут правильно , вы увидите в журналах Data Flow Server
, что модули были запущены и связаны вместе:
2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source
11. Просмотр результата
В этом примере источник просто отправляет текущую временную метку в виде сообщения каждую секунду, процессор форматирует ее, а приемник журнала выводит отформатированную временную метку, используя структуру ведения журнала.
Файлы журнала расположены в каталоге, отображаемом в выходных данных журнала Data Flow Server
, как показано выше. Чтобы увидеть результат, мы можем просмотреть журнал:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21
12. Заключение
В этой статье мы увидели, как построить конвейер данных для потоковой обработки с помощью Spring Cloud Data Flow
.
Кроме того, мы увидели роль приложений Source
, Processor
и Sink
внутри потока и то, как подключить и связать этот модуль внутри Data Flow Server
с помощью Data Flow Shell
.
Код примера можно найти в проекте GitHub .