Перейти к основному содержимому

Использование подпотоков в Spring Integration

· 7 мин. чтения

1. Обзор

Spring Integration упрощает использование некоторых шаблонов корпоративной интеграции . Один из этих способов через его DSL .

В этом руководстве мы рассмотрим поддержку подпотоков DSL для упрощения некоторых наших конфигураций.

2. Наша задача

Допустим, у нас есть последовательность целых чисел, которую мы хотим разделить на три разных сегмента.

И если бы мы хотели использовать Spring Integration для этого, мы могли бы начать с создания трех выходных каналов:

  • Такие числа, как 0, 3, 6 и 9, перейдут в MultipleOfThreeChannel .
  • Такие числа, как 1, 4, 7 и 10, перейдут на остатокIsOneChannel .
  • А такие числа, как 2, 5, 8 и 11, идут в остаткеIsTwoChannel .

Чтобы увидеть, насколько полезными могут быть подпотоки, давайте начнем с того, как это будет выглядеть без подпотоков.

Затем мы будем использовать подпотоки, чтобы упростить нашу конфигурацию:

  • опубликоватьПодписатьсяканал
  • маршрутторесипиентс
  • Filter s, чтобы настроить нашу логику if-then
  • Router s, для настройки логики нашего коммутатора

3. Предпосылки

Теперь, прежде чем настраивать наши подпотоки, давайте создадим эти выходные каналы.

Мы создадим эти QueueChannel , так как их немного проще продемонстрировать:

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {

@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}

@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}

@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}

boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}

boolean isRemainderIOne(Integer number) {
return number % 3 == 1;
}

boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
}

В конечном счете, именно здесь окажутся наши сгруппированные числа.

Также обратите внимание, что Spring Integration может легко показаться сложным, поэтому мы добавим несколько вспомогательных методов для удобочитаемости.

4. Решение без подпотоков

Теперь нам нужно определить наши потоки.

Без подпотоков простая идея состоит в том, чтобы определить три отдельных потока интеграции, по одному для каждого типа числа.

Мы отправим одинаковую последовательность сообщений каждому компоненту IntegrationFlow , но выходные сообщения для каждого компонента будут разными.

4.1. Определение компонентов IntegrationFlow

Во-первых, давайте определим каждый bean-компонент IntegrationFlow в нашем классе SubflowConfiguration :

@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}

Наш поток содержит две конечные точки — разделитель , за которым следует фильтр . ``

Фильтр делает то, на что он похож. Но зачем нам еще и разветвитель? Мы увидим это через минуту, но в основном он разбивает входную коллекцию на отдельные сообщения.

И мы можем, конечно, таким же образом определить еще два bean-компонента IntegrationFlow .

4.2. Шлюзы обмена сообщениями

Для каждого потока нам также нужен Message Gateway .

Проще говоря, они абстрагируют API Spring Integration Messages от вызывающей стороны, аналогично тому, как служба REST может абстрагироваться от HTTP:

@MessagingGateway
public interface NumbersClassifier {

@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleOfThree(Collection<Integer> numbers);

@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);

@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);

}

Для каждого нам нужно использовать аннотацию @Gateway и указать неявное имя для входного канала, которое представляет собой просто имя компонента, за которым следует «.input» . Обратите внимание, что мы можем использовать это соглашение, потому что мы используем потоки на основе лямбда.

Эти методы являются точками входа в наши потоки.

4.3. Отправка сообщений и проверка вывода

А теперь протестируем:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {

@Autowired
private QueueChannel multipleOfThreeChannel;

@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
assertNull(outMessage);
}
}

Обратите внимание, что мы отправили сообщения в виде списка , поэтому нам понадобился разделитель, чтобы взять одно «сообщение-список» и преобразовать его в несколько «числовых сообщений».

Мы вызываем Receive с помощью o , чтобы получить следующее доступное сообщение без ожидания. Поскольку в нашем списке есть два числа, кратные трем, мы ожидаем, что сможем вызвать его дважды. Третий вызов для получения возвращает null .

Receive , конечно же, возвращает Message , поэтому мы вызываем getPayload для извлечения числа.

Точно так же мы могли бы сделать то же самое для двух других.

Итак, это было решение без подпотоков. У нас есть три отдельных потока для обслуживания и три отдельных метода шлюза.

Теперь мы заменим три bean-компонента IntegrationFlow одним bean-компонентом и три метода шлюза одним.

5. Использование PublishSubscribeChannel

Метод publishSubscribeChannel() рассылает сообщения всем подписывающимся подпотокам. Таким образом, мы можем создать один поток вместо трех.

@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription ->
subscription
.subscribe(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}

Таким образом, подпотоки являются анонимными, что означает, что к ним нельзя обращаться независимо друг от друга.

Теперь у нас есть только один поток, поэтому давайте также отредактируем наш NumbersClassifier :

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

Теперь, поскольку у нас есть только один bean-компонент IntegrationFlow и один метод шлюза, нам нужно отправить наш список только один раз:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

// same assertions as before
}

Обратите внимание, что с этого момента изменится только определение потока интеграции, поэтому мы больше не будем показывать тест.

6. Использование routeToRecipients

Другой способ добиться того же — routeToRecipients , что хорошо, потому что в него встроена фильтрация.

Используя этот метод, мы можем указать как каналы, так и подпотоки для трансляции.

6.1. получатель

В приведенном ниже коде мы укажем multipleof3Channel , resterIs1Channel и resterIsTwoChannel в качестве получателей в соответствии с нашими условиями:

@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.<Integer> recipient("multipleOfThreeChannel",
this::isMultipleOfThree)
.<Integer> recipient("remainderIsOneChannel",
this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",
this::isRemainderTwo));
}

Мы также можем вызвать получателя без условия, и routeToRecipients безоговорочно опубликует это место назначения.

6.2. получательFlow

Обратите внимание, что routeToRecipients позволяет нам определить полный поток, как и publishSubscribeChannel.

Давайте изменим приведенный выше код и укажем анонимный подпоток в качестве первого получателя :

.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("mutipleOfThreeChannel"))
...);

Этот подпоток будет получать всю последовательность сообщений, поэтому нам нужно отфильтровать, как раньше, чтобы получить такое же поведение.

Опять же, нам хватило одного bean -компонента IntegrationFlow .

Теперь давайте перейдем к компонентам if-else . Один из них — Фильтр .

7. Использование потоков « если-то »

Мы уже использовали фильтр во всех предыдущих примерах. Хорошая новость заключается в том, что мы можем указать не только условие дальнейшей обработки, но и канал или поток для отбрасываемых сообщений .

Мы можем думать о потоках и каналах сброса как о блоке else :

@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree,
notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow
.discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel"))
.channel("multipleofThreeChannel");
}

В этом случае мы реализовали нашу логику маршрутизации if-else :

  • Если число не кратно трем, отбрасывайте эти сообщения в поток отбрасывания; здесь мы используем поток, так как требуется больше логики, чтобы узнать его канал назначения.
  • В потоке сброса, если число не равно единице, отбрасывайте эти сообщения в канал сброса.

8. включение вычисляемого значения

И, наконец, давайте попробуем метод route , который дает нам немного больше контроля, чем routeToRecipients. Это хорошо, потому что Router может разделить поток на любое количество частей, тогда как Filter может сделать только две.

8.1. отображение канала

Давайте определим наш bean-компонент IntegrationFlow :

@Bean
public IntegrationFlow classify() {
return classify -> classify.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleOfThreeChannel")
.channelMapping(1, "remainderIsOneChannel")
.channelMapping(2, "remainderIsTwoChannel"));
}

В приведенном выше коде мы вычисляем ключ маршрутизации, выполняя деление:

route(p -> p % 3,...

На основе этого ключа мы маршрутизируем сообщения:

channelMapping(0, "multipleof3Channel")

8.2. subFlowMapping

Теперь, как и в других случаях, мы можем получить больше контроля, указав подпоток, заменив channelMapping на subFlowMapping :

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

Или еще больше контроля, вызывая метод handle вместо метода канала :

.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");

В этом случае подпоток возвращался бы к основному потоку после метода route() , поэтому там нам нужно было бы указать канал resterIsTwoChannel.

9. Заключение

В этом руководстве мы рассмотрели, как фильтровать и маршрутизировать сообщения с помощью подпотоков.

Как обычно, полный исходный код доступен на GitHub .