Перейти к основному содержимому

Условный поток в Spring Batch

· 4 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Введение

Мы используем Spring Batch для составления заданий из нескольких шагов, которые считывают, преобразуют и записывают данные. Если шаги в задании имеют несколько путей, подобно использованию оператора if в нашем коде, мы говорим, что поток задания является условным .

В этом руководстве мы рассмотрим два способа создания заданий Spring Batch с условным потоком.

2. Выйти из статуса и статуса партии

Когда мы указываем условный шаг в пакетной среде Spring, мы используем статус выхода шага или задания. Поэтому нам нужно понимать разницу между статусом партии и статусом выхода в наших шагах и заданиях:

  • BatchStatus — это Enum, представляющий статус шага/задания, который используется внутри пакетной среды.
  • Возможные значения: ЗАБРОШЕНО, ЗАВЕРШЕНО, НЕУДАЧНО, ЗАПУЩЕНО, ЗАПУСКАЕТСЯ, ОСТАНОВЛЕНО, ОСТАНОВЛЕНО, НЕИЗВЕСТНО .
  • ExitStatus — это статус шага, когда выполнение завершено, и используется для условного определения потока.

По умолчанию ExitStatus шага или задания совпадает с BatchStatus . Мы также можем установить собственный ExitStatus для управления потоком.

3. Условный поток

Допустим, у нас есть устройство IOT, которое отправляет нам измерения. Измерения нашего устройства представляют собой массивы целых чисел, и нам нужно отправлять уведомления, если какие-либо из наших измерений содержат положительные целые числа.

Другими словами, нам нужно отправить уведомление, когда мы обнаружим положительное измерение.

3.1. ВыходСтатус

Важно отметить, что мы используем статус выхода шага для управления условным потоком .

Чтобы установить статус выхода шага, нам нужно использовать метод setExitStatus объекта StepExecution . Для этого нам нужно создать ItemProcessor , который расширяет ItemListenerSupport и получает StepExecution шага .

Мы используем это, чтобы установить статус выхода нашего шага в NOTIFY , когда мы находим положительное число. Когда мы определяем наш статус выхода на основе данных в пакетном задании, мы можем использовать ItemProcessor .

Давайте посмотрим на наш NumberInfoClassifier , чтобы увидеть три необходимых нам метода:

public class NumberInfoClassifier extends ItemListenerSupport<NumberInfo, Integer>
implements ItemProcessor<NumberInfo, Integer> {

private StepExecution stepExecution;

@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.setExitStatus(new ExitStatus(QUIET));
}

@Override
public Integer process(NumberInfo numberInfo) throws Exception {
return Integer.valueOf(numberInfo.getNumber());
}

@Override
public void afterProcess(NumberInfo item, Integer result) {
super.afterProcess(item, result);
if (item.isPositive()) {
stepExecution.setExitStatus(new ExitStatus(NOTIFY));
}
}
}

Примечание: мы используем ItemProcessor для установки ExitStatus в этом примере, но мы могли бы так же легко сделать это в ItemReader или ItemWriter нашего шага .

Наконец, когда мы создаем нашу работу, мы говорим нашей JobBuilderFactory отправлять уведомления для любого шага, который завершается со статусом NOTIFY :

jobBuilderFactory.get("Number generator - second dataset")
.start(dataProviderStep)
.on("NOTIFY").to(notificationStep)
.end()
.build();

Также обратите внимание, что когда у нас есть дополнительные условные ветки и несколько кодов выхода, мы можем добавить их в наше задание с помощью методов from и on объекта JobBuilderFacotry :

jobBuilderFactory.get("Number generator - second dataset")
.start(dataProviderStep)
.on("NOTIFY").to(notificationStep)
.from(step).on("LOG_ERROR").to(errorLoggingStep)
.end()
.build();

Теперь каждый раз, когда наш ItemProcessor увидит положительное число, он направит нашу работу на запускnotificationStep , который просто выводит сообщение в System.out :

Second Dataset Processor 11
Second Dataset Processor -2
Second Dataset Processor -3
[Number generator - second dataset] contains interesting data!!

Если бы у нас был набор данных без положительного числа, мы бы не увидели наше уведомление о шаге:

Second Dataset Processor -1
Second Dataset Processor -2
Second Dataset Processor -3

3.2. Программное ветвление с помощью JobExecutionDecider

В качестве альтернативы мы можем использовать класс, который реализует JobExecutionDecider для определения потока заданий. Это особенно полезно , если у нас есть внешние факторы для определения потока выполнения .

Чтобы использовать этот метод, нам сначала нужно изменить наш ItemProcessor , чтобы удалить интерфейс ItemListenerSupport и метод @BeforeStep :

public class NumberInfoClassifierWithDecider extends ItemListenerSupport<NumberInfo, Integer>
implements ItemProcessor<NumberInfo, Integer> {

@Override
public Integer process(NumberInfo numberInfo) throws Exception {
return Integer.valueOf(numberInfo.getNumber());
}
}

Затем мы создаем решающий класс, который определяет статус уведомления нашего шага:

public class NumberInfoDecider implements JobExecutionDecider {

private boolean shouldNotify() {
return true;
}

@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (shouldNotify()) {
return new FlowExecutionStatus(NOTIFY);
} else {
return new FlowExecutionStatus(QUIET);
}
}
}

Затем мы настраиваем наше задание на использование решающего устройства в потоке:

jobBuilderFactory.get("Number generator - third dataset")
.start(dataProviderStep)
.next(new NumberInfoDecider()).on("NOTIFY").to(notificationStep)
.end()
.build();

4. Вывод

В этом кратком руководстве мы рассмотрели два варианта реализации условных потоков с помощью Spring Batch. Во-первых, мы рассмотрели, как использовать ExitStatus для управления потоком нашей работы.

Затем мы рассмотрели, как программно управлять потоком, определив собственный JobExecutionDecider .

Как всегда, полный исходный код статьи доступен на GitHub .