Перейти к основному содержимому

Начало работы с потоковой обработкой с помощью Spring Cloud Data Flow

· 9 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Введение

Spring Cloud Data Flow — это облачная модель программирования и эксплуатации для компонуемых микросервисов данных.

С помощью Spring Cloud Data Flow разработчики могут создавать и организовывать конвейеры данных для распространенных случаев использования, таких как прием данных, аналитика в реальном времени и импорт/экспорт данных.

Эти конвейеры данных бывают двух видов: конвейеры потоковых и пакетных данных.

В первом случае неограниченное количество данных потребляется или создается с помощью промежуточного ПО для обмена сообщениями. В то время как во втором случае кратковременная задача обрабатывает конечный набор данных и затем завершается.

В этой статье речь пойдет о потоковой обработке.

2. Архитектурный обзор

Ключевыми компонентами архитектуры этого типа являются приложения , сервер потока данных и целевая среда выполнения.

Кроме того, в дополнение к этим ключевым компонентам у нас также обычно есть оболочка потока данных и брокер сообщений в архитектуре.

Давайте рассмотрим все эти компоненты более подробно.

2.1. Приложения

Как правило, конвейер потоковой передачи данных включает в себя потребление событий из внешних систем, обработку данных и постоянство полиглотов. Эти фазы обычно называются Source , Processor и Sink в терминологии Spring Cloud :

  • Источник: это приложение, которое потребляет события
  • Процессор: потребляет данные из источника , выполняет некоторую обработку и передает обработанные данные следующему приложению в конвейере.
  • Приемник: либо потребляет из источника , либо из процессора и записывает данные на желаемый уровень сохраняемости.

Эти приложения могут быть упакованы двумя способами:

  • Uber-jar Spring Boot, размещенный в репозитории maven, файле, http или любой другой реализации ресурса Spring (этот метод будет использоваться в этой статье)
  • Докер

Многие источники, процессоры и приложения-приемники для распространенных вариантов использования (например, jdbc, hdfs, http, router) уже предоставлены и готовы к использованию командой Spring Cloud Data Flow .

2.2. Время выполнения

Кроме того, для выполнения этих приложений требуется среда выполнения. Поддерживаемые среды выполнения:

  • Облачная Литейная
  • Апачская пряжа
  • Кубернетес
  • Апач Месос
  • Локальный сервер для разработки (который будет использоваться в этой статье)

2.3. Сервер потока данных

Компонентом, отвечающим за развертывание приложений в среде выполнения, является Data Flow Server . Для каждой из целевых сред выполнения предусмотрен исполняемый файл JAR Data Flow Server .

Data Flow Server отвечает за интерпретацию:

  • Потоковый DSL, описывающий логический поток данных через несколько приложений.
  • Манифест развертывания, описывающий сопоставление приложений со средой выполнения.

2.4. Оболочка потока данных

Data Flow Shell — это клиент для Data Flow Server. Оболочка позволяет нам выполнять команду DSL, необходимую для взаимодействия с сервером.

Например, DSL для описания потока данных от источника http к приемнику jdbc будет записан как «http | jdbc». Эти имена в DSL регистрируются на сервере потока данных и сопоставляются с артефактами приложений, которые могут размещаться в репозиториях Maven или Docker.

Spring также предлагает графический интерфейс Flo для создания и мониторинга конвейеров потоковой передачи данных. Однако его использование выходит за рамки обсуждения этой статьи.

2.5. Брокер сообщений

Как мы видели в примере из предыдущего раздела, мы использовали символ вертикальной черты в определении потока данных. Символ вертикальной черты представляет связь между двумя приложениями через промежуточное ПО для обмена сообщениями.

Это означает, что нам нужен брокер сообщений , работающий в целевой среде.

Два поддерживаемых брокера промежуточного программного обеспечения для обмена сообщениями:

  • Апач Кафка
  • RabbitMQ

Итак, теперь, когда у нас есть обзор архитектурных компонентов , пришло время построить наш первый конвейер потоковой обработки.

3. Установите брокер сообщений

Как мы видели, приложениям в конвейере для связи требуется промежуточное ПО для обмена сообщениями. Для целей этой статьи мы будем использовать RabbitMQ .

Для получения полной информации об установке вы можете следовать инструкции на официальном сайте .

4. Локальный сервер потока данных

Чтобы ускорить процесс создания наших приложений, мы будем использовать Spring Initializr ; с его помощью мы можем получить наши приложения Spring Boot за несколько минут.

После перехода на веб-сайт просто выберите группу и имя артефакта .

Как только это будет сделано, нажмите кнопку « Создать проект» , чтобы начать загрузку артефакта Maven.

./ceb8357c90319bded44b02ce73988141.jpg

После завершения загрузки разархивируйте проект и импортируйте его как проект Maven в выбранную вами IDE.

Давайте добавим в проект зависимость Maven. Поскольку нам понадобятся библиотеки Dataflow Local Server , давайте добавим зависимость spring-cloud-starter-dataflow-server-local :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

Теперь нам нужно аннотировать основной класс Spring Boot аннотацией @EnableDataFlowServer :

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}

Это все. Наш локальный сервер потока данных готов к запуску:

mvn spring-boot:run

Приложение загрузится на порту 9393.

5. Оболочка потока данных

Снова перейдите в Spring Initializr и выберите имя группы и артефакта .

После того, как мы загрузили и импортировали проект, давайте добавим зависимость spring-cloud-dataflow-shell :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

Теперь нам нужно добавить аннотацию @EnableDataFlowShell в основной класс Spring Boot :

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}

Теперь мы можем запустить оболочку:

mvn spring-boot:run

После запуска оболочки мы можем ввести команду справки в приглашении, чтобы увидеть полный список команд, которые мы можем выполнить.

6. Исходное приложение

Точно так же в Initializr мы создадим простое приложение и добавим зависимость Stream Rabbit с именем spring-cloud-starter-stream-rabbit:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Затем мы добавим аннотацию @EnableBinding(Source.class) к основному классу Spring Boot :

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {

public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}

Теперь нам нужно определить источник данных, которые необходимо обработать. Этим источником может быть любая потенциально бесконечная рабочая нагрузка (данные датчиков Интернета вещей, круглосуточная обработка событий, прием данных онлайн-транзакций).

В нашем примере приложения мы создаем одно событие (для простоты — новую отметку времени) каждые 10 секунд с помощью Poller .

Аннотация @InboundChannelAdapter отправляет сообщение в выходной канал источника, используя возвращаемое значение в качестве полезной нагрузки сообщения:

@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

Наш источник данных готов.

7. Приложение процессора

Далее мы создадим приложение и добавим зависимость Stream Rabbit .

Затем мы добавим аннотацию @EnableBinding(Processor.class) к основному классу Spring Boot :

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}

Далее нам нужно определить метод для обработки данных, поступающих из исходного приложения.

Чтобы определить трансформатор, нам нужно аннотировать этот метод аннотацией @Transformer :

@Transformer(inputChannel = Processor.INPUT, 
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}

Он преобразует метку времени из «входного» канала в отформатированную дату, которая будет отправлена в «выходной» канал.

8. Приложение Sink

Последним приложением, которое нужно создать, является приложение Sink.

Снова перейдите в Spring Initializr и выберите Group , имя артефакта . После загрузки проекта добавим зависимость Stream Rabbit .

Затем добавьте аннотацию @EnableBinding(Sink.class) к основному классу Spring Boot :

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}

Теперь нам нужен метод для перехвата сообщений, поступающих от процессорного приложения.

Для этого нам нужно добавить аннотацию @StreamListener(Sink.INPUT) к нашему методу:

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}

Метод просто печатает отметку времени, преобразованную в отформатированную дату, в файл журнала.

9. Зарегистрируйте потоковое приложение

Spring Cloud Data Flow Shell позволяет нам зарегистрировать потоковое приложение в реестре приложений с помощью команды app register .

Мы должны предоставить уникальное имя, тип приложения и URI, которые могут быть разрешены для артефакта приложения. В качестве типа укажите « источник », « процессор » или « приемник ».

При предоставлении URI со схемой maven формат должен соответствовать следующему:

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

Чтобы зарегистрировать ранее созданные приложения Source , Processor и Sink , перейдите в оболочку Spring Cloud Data Flow Shell и выполните следующие команды из командной строки: ``

app register --name time-source --type source 
--uri maven://com.foreach.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor
--uri maven://com.foreach.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink
--uri maven://com.foreach.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

10. Создайте и разверните поток

Чтобы создать новое определение потока, перейдите в Spring Cloud Data Flow Shell и выполните следующую команду оболочки:

stream create --name time-to-log 
--definition 'time-source | time-processor | logging-sink'

Это определяет поток с именем time-to-log на основе выражения DSL 'time-source | процессор времени | регистрационный сток' .

Затем для развертывания потока выполните следующую команду оболочки:

stream deploy --name time-to-log

Data Flow Server преобразует time - source , time-processor и logging-sink в координаты maven и использует их для запуска приложений time-source , time-processor и logging-sink потока.

Если поток развернут правильно , вы увидите в журналах Data Flow Server , что модули были запущены и связаны вместе:

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Просмотр результата

В этом примере источник просто отправляет текущую временную метку в виде сообщения каждую секунду, процессор форматирует ее, а приемник журнала выводит отформатированную временную метку, используя структуру ведения журнала.

Файлы журнала расположены в каталоге, отображаемом в выходных данных журнала Data Flow Server , как показано выше. Чтобы увидеть результат, мы можем просмотреть журнал:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Заключение

В этой статье мы увидели, как построить конвейер данных для потоковой обработки с помощью Spring Cloud Data Flow .

Кроме того, мы увидели роль приложений Source , Processor и Sink внутри потока и то, как подключить и связать этот модуль внутри Data Flow Server с помощью Data Flow Shell .

Код примера можно найти в проекте GitHub .