1. Введение
В этой статье мы продемонстрируем, как использовать стартовые приложения Spring Cloud, которые предоставляют загрузочные и готовые к работе приложения, которые могут служить отправной точкой для будущей разработки.
Проще говоря, приложения Task App Starters предназначены для таких сценариев использования, как миграция базы данных и распределенное тестирование, а Stream App Starters обеспечивают интеграцию с внешними системами.
Всего более 55 участников; ознакомьтесь с официальной документацией здесь и здесь для получения дополнительной информации об этих двух.
Далее мы создадим небольшое распределенное приложение Twitter, которое будет передавать сообщения Twitter в распределенную файловую систему Hadoop.
2. Установка
Мы будем использовать ключ потребителя
и токен доступа
для создания простого приложения Twitter.
Затем мы настроим Hadoop, чтобы мы могли сохранить наш Twitter Stream для будущих целей больших данных.
Наконец, у нас есть возможность либо использовать предоставленные репозитории Spring GitHub для компиляции и сборки автономных компонентов шаблона архитектуры источников
— процессоры-приемники
с использованием Maven, либо объединить источники
, процессоры
и приемники
через их интерфейсы привязки Spring Stream.
Мы рассмотрим оба способа сделать это.
Стоит отметить, что раньше все Stream App Starters были объединены в один большой репозиторий по адресу github.com/spring-cloud/spring-cloud-stream-app-starters . Каждый Starter был упрощен и изолирован.
3. Учетные данные Twitter
Во-первых, давайте настроим наши учетные данные разработчика Twitter. Чтобы получить учетные данные разработчика Twitter, следуйте инструкциям по настройке приложения и созданию маркера доступа из официальной документации для разработчиков Twitter .
В частности, нам понадобятся:
- Потребительский ключ
- Секрет потребительского ключа
- Секрет токена доступа
- Токен доступа
Обязательно держите это окно открытым или запишите их, так как мы будем использовать их ниже!
4. Установка Hadoop
Далее давайте установим Hadoop! Мы можем либо следовать официальной документации, либо просто использовать Docker:
$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1
5. Компиляция стартеров нашего приложения
Чтобы использовать автономные, полностью отдельные компоненты, мы можем загрузить и скомпилировать нужные стартовые приложения Spring Cloud Stream по отдельности из их репозиториев GitHub.
5.1. Стартовое приложение Twitter Spring Cloud Stream
Давайте добавим в наш проект приложение Twitter Spring Cloud Stream Starter ( org.springframework.cloud.stream.app.twitterstream.source ):
git clone https://github.com/spring-cloud-stream-app-starters/twitter.git
Затем запускаем Maven:
./mvnw clean install -PgenerateApps
Полученное скомпилированное стартовое приложение будет доступно в «/target» локального корня проекта.
Затем мы можем запустить этот скомпилированный .jar и передать соответствующие свойства приложения следующим образом:
java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
--accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>
Мы также можем передать наши учетные данные, используя знакомый Spring application.properties:
twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...
5.2. Стартовое приложение HDFS Spring Cloud Stream
Теперь (с уже настроенным Hadoop) давайте добавим в наш проект зависимость HDFS Spring Cloud Stream App Starter ( org.springframework.cloud.stream.app.hdfs.sink ).
Сначала клонируйте соответствующий репозиторий:
git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git
Затем запустите задание Maven:
./mvnw clean install -PgenerateApps
Полученное скомпилированное стартовое приложение будет доступно в «/target» локального корня проекта. Затем мы можем запустить этот скомпилированный .jar и передать соответствующие свойства приложения:
java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/
' hdfs://127.0.0.1:50010/
' является значением по умолчанию для Hadoop, но ваш порт HDFS по умолчанию может отличаться в зависимости от того, как вы настроили свой экземпляр.
Мы можем увидеть список узлов данных (и их текущих портов) по адресу « http://0.0.0.0:50070
», учитывая настройки, которые мы передали ранее.
Мы также можем передать наши учетные данные, используя знакомые Spring application.properties
перед компиляцией, поэтому нам не нужно всегда передавать их через CLI.
Давайте настроим наш application.properties
для использования порта Hadoop по умолчанию:
hdfs.fs-uri=hdfs://127.0.0.1:50010/
6. Использование AggregateApplicationBuilder
В качестве альтернативы мы можем объединить наш Spring Stream Source
и Sink
через org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder
в простое приложение Spring Boot!
Во-первых, мы добавим два стартовых приложения Stream в наш pom.xml
:
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
</dependencies>
Затем мы начнем объединять две наши зависимости Stream App Starter, заключая их в соответствующие подприложения.
6.1. Создание компонентов нашего приложения
Наше SourceApp
указывает источник
, который необходимо преобразовать или использовать:
@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
@InboundChannelAdapter(Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
Обратите внимание, что мы привязываем наше SourceApp
к org.springframework.cloud.stream.messaging.Source
и внедряем соответствующий класс конфигурации, чтобы получить необходимые настройки из свойств нашей среды.
Далее мы настраиваем простую привязку org.springframework.cloud.stream.messaging.Processor
:
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
log.info("Payload received!");
return payload;
}
}
Затем мы создаем нашего потребителя ( Sink
):
@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
@ServiceActivator(inputChannel= Sink.INPUT)
public void loggerSink(Object payload) {
log.info("Received: " + payload);
}
}
Здесь мы привязываем наш SinkApp
к org.springframework.cloud.stream.messaging.Sink
и снова внедряем правильный класс конфигурации, чтобы использовать указанные нами настройки Hadoop.
Наконец, мы объединяем наши SourceApp
, ProcessorApp
и SinkApp
, используя AggregateApplicationBuilder
в нашем основном методе AggregateApp :
@SpringBootApplication
public class AggregateApp {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApp.class).args("--fixedDelay=5000")
.via(ProcessorApp.class)
.to(SinkApp.class).args("--debug=true")
.run(args);
}
}
Как и в любом приложении Spring Boot, мы можем вводить указанные параметры в качестве свойств среды через application.properties или
программно.
Поскольку мы используем среду Spring Stream, мы также можем передать наши аргументы в конструктор AggregateApplicationBuilder
.
6.2. Запуск готового приложения
Затем мы можем скомпилировать и запустить наше приложение, используя следующие инструкции командной строки:
$ mvn install
$ java -jar twitterhdfs.jar
Не забудьте хранить каждый класс @SpringBootApplication
в отдельном пакете (иначе будет выдано несколько разных исключений привязки)! Для получения дополнительной информации о том, как использовать AggregateApplicationBuilder
, ознакомьтесь с официальной документацией .
После того, как мы скомпилируем и запустим наше приложение, мы должны увидеть что-то вроде следующего в нашей консоли (естественно, содержимое будет варьироваться в зависимости от твита):
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
c.b.twitterhdfs.processor.ProcessorApp : Payload received!
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
com.foreach.twitterhdfs.sink.SinkApp : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...
Они демонстрируют корректную работу нашего процессора
и стока
при получении данных от Источника
! В этом примере мы не настроили наш HDFS Sink так, чтобы он делал что-то — он просто напечатает сообщение «Полезная нагрузка получена!»
7. Заключение
В этом руководстве мы узнали, как объединить два замечательных стартовых приложения Spring Stream в один прекрасный пример Spring Boot!
Вот несколько других замечательных официальных статей о стартерах Spring Boot и о том, как создать настраиваемый стартер !
Как всегда, код, использованный в статье, можно найти на GitHub .