1. Введение
В этом руководстве мы узнаем о Spring Integration Java DSL для создания интеграции приложений.
Мы возьмем интеграцию с перемещением файлов, созданную в разделе « Введение в Spring Integration », и вместо нее будем использовать DSL.
2. Зависимости
Spring Integration Java DSL является частью Spring Integration Core .
Итак, мы можем добавить эту зависимость:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
И для работы с нашим приложением для перемещения файлов нам также понадобится Spring Integration File :
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
3. Spring Интеграция Java DSL
До Java DSL пользователи настраивали компоненты Spring Integration в XML.
DSL представляет несколько быстрых компоновщиков, из которых мы можем легко создать полный конвейер Spring Integration исключительно на Java.
Итак, допустим, мы хотим создать канал, в котором любые данные, поступающие по каналу, записываются в верхнем регистре.
В прошлом мы могли бы сделать:
<int:channel id="input"/>
<int:transformer input-channel="input" expression="payload.toUpperCase()" />
И теперь мы можем вместо этого сделать:
@Bean
public IntegrationFlow upcaseFlow() {
return IntegrationFlows.from("input")
.transform(String::toUpperCase)
.get();
}
4. Приложение для перемещения файлов
Чтобы начать интеграцию с перемещением файлов, нам понадобятся несколько простых строительных блоков.
4.1. Процесс интеграции
Первый строительный блок, который нам нужен, — это поток интеграции, который мы можем получить из построителя IntegrationFlows :
IntegrationFlows.from(...)
from
может принимать несколько типов, но в этом уроке мы рассмотрим только три:
MessageSource
sMessageChannels
иСтрока
с
Вскоре мы поговорим обо всех трех.
После вызова from
нам стали доступны некоторые методы настройки:
IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory())
// add more components
.get();
В конечном счете, IntegrationFlows
всегда будет создавать экземпляр IntegrationFlow,
который является конечным продуктом любого приложения Spring Integration.
Этот шаблон ввода, выполнения соответствующих преобразований и выдачи результатов является фундаментальным для всех приложений Spring Integration .
4.2. Описание источника ввода
Во-первых, чтобы переместить файлы, нам нужно указать нашему потоку интеграции, где он должен их искать, и для этого нам нужен MessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
// .. create a message source
}
Проще говоря, MessageSource
— это место, откуда могут приходить сообщения, являющиеся внешними по отношению к приложению .
В частности, нам нужно что-то, что может адаптировать
этот внешний источник к представлению обмена сообщениями Spring. И поскольку эта адаптация
ориентирована на ввод
, их часто называют адаптерами входных каналов.
Зависимость spring-integration-file
дает нам адаптер входного канала, который отлично подходит для нашего варианта использования: FileReadingMessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}
Здесь наш FileReadingMessageSource
будет читать каталог, заданный INPUT_DIR
, и создаст из него MessageSource
.
Давайте укажем это как наш источник в вызове IntegrationFlows.from :
IntegrationFlows.from(sourceDirectory());
4.3. Настройка источника ввода
Теперь, если мы думаем об этом как о долгоживущем приложении, мы, вероятно, захотим иметь возможность замечать файлы по мере их поступления , а не просто перемещать файлы, которые уже есть при запуске.
Чтобы облегчить это, from
также может использовать дополнительные конфигураторы
для дальнейшей настройки источника ввода:
IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));
В этом случае мы можем сделать наш источник ввода более устойчивым, сказав Spring Integration опрашивать этот источник — в данном случае нашу файловую систему — каждые 10 секунд.
И, конечно же, это относится не только к нашему файловому источнику ввода, мы можем добавить этот опросник к любому MessageSource
.
4.4. Фильтрация сообщений из источника ввода
Далее, давайте предположим, что мы хотим, чтобы наше приложение для перемещения файлов перемещало только определенные файлы, скажем, файлы изображений с расширением jpg .
Для этого мы можем использовать GenericSelector
:
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().endsWith(".jpg");
}
};
}
Итак, давайте снова обновим наш поток интеграции:
IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs());
Или, поскольку этот фильтр такой простой, мы могли бы определить его с помощью лямбда-выражения :
IntegrationFlows.from(sourceDirectory())
.filter(source -> ((File) source).getName().endsWith(".jpg"));
4.5. Обработка сообщений с помощью сервисных активаторов
Теперь, когда у нас есть отфильтрованный список файлов, нам нужно записать их в новое место.
Service Activator —
это
то, к чему мы обращаемся, когда думаем о результатах в Spring Integration.
Воспользуемся активатором службы FileWritingMessageHandler из
файла spring-integration-file
:
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
Здесь наш FileWritingMessageHandler
будет записывать каждую полезную нагрузку Message
, которую он получает, в OUTPUT_DIR
.
Опять же, давайте обновим:
IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory());
И, кстати, обратите внимание на использование setExpectReply
. Поскольку потоки интеграции могут быть двунаправленными, этот вызов указывает, что этот конкретный канал является односторонним.
4.6. Активация нашего потока интеграции
Когда мы добавили все наши компоненты, нам нужно зарегистрировать наш IntegrationFlow
как bean -компонент , чтобы активировать его:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
Метод get
извлекает экземпляр IntegrationFlow
, который нам нужно зарегистрировать как Spring Bean.
Как только контекст нашего приложения загружается, все наши компоненты, содержащиеся в нашем IntegrationFlow
, активируются.
И теперь наше приложение начнет перемещать файлы из исходного каталога в целевой каталог.
5. Дополнительные компоненты
В нашем приложении для перемещения файлов на основе DSL мы создали адаптер входящего канала, фильтр сообщений и активатор службы.
Давайте рассмотрим несколько других распространенных компонентов Spring Integration и посмотрим, как мы можем их использовать.
5.1. Каналы сообщений
Как упоминалось ранее, канал сообщений
— это еще один способ инициализации потока:
IntegrationFlows.from("anyChannel")
Мы можем прочитать это как «пожалуйста, найдите или создайте компонент канала с именем anyChannel
. Затем прочитайте любые данные, поступающие в anyChannel
из других потоков».
Но на самом деле он более универсален.
Проще говоря, канал отделяет производителей от потребителей, и мы можем думать о нем как о очереди
Java . Канал может быть вставлен в любой точке потока .
Предположим, например, что мы хотим расставить приоритеты для файлов при их перемещении из одного каталога в другой:
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) ->
((File)left.getPayload()).getName().compareTo(
((File)right.getPayload()).getName()));
}
Затем мы можем вставить вызов для канала
между нашим потоком:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
Есть десятки каналов на выбор, некоторые из наиболее удобных предназначены для параллелизма, аудита или промежуточного сохранения (например, буферы Kafka или JMS).
Кроме того, каналы могут быть мощными в сочетании с мостами
.
5.2. Мост
Когда мы хотим объединить два канала , мы используем мост.
Давайте представим, что вместо записи непосредственно в выходной каталог наше приложение для перемещения файлов записывает в другой канал:
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
Теперь, поскольку мы просто записали его в канал, мы можем соединить его с другими потоками .
Давайте создадим мост, который опрашивает наш резервуар для хранения сообщений и записывает их в пункт назначения:
@Bean
public IntegrationFlow fileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.handle(targetDirectory())
.get();
}
Опять же, поскольку мы писали в промежуточный канал, теперь мы можем добавить еще один поток , который берет эти же файлы и записывает их с другой скоростью :
@Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
.handle(anotherTargetDirectory())
.get();
}
Как мы видим, отдельные мосты могут управлять конфигурацией опроса для разных обработчиков.
Как только наш контекст приложения загружен, у нас теперь есть более сложное приложение в действии, которое начнет перемещать файлы из исходного каталога в два целевых каталога.
6. Заключение
В этой статье мы увидели различные способы использования Spring Integration Java DSL для создания различных конвейеров интеграции.
По сути, мы смогли воссоздать приложение для перемещения файлов из предыдущего руководства, на этот раз с использованием чистой Java.
Кроме того, мы рассмотрели несколько других компонентов, таких как каналы и мосты.
Полный исходный код, использованный в этом руководстве, доступен на Github .