Перейти к основному содержимому

Spring Batch с использованием Partitioner

· 4 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Обзор

В нашем предыдущем введении в Spring Batch мы представили фреймворк как инструмент пакетной обработки. Мы также изучили детали конфигурации и реализацию однопоточного выполнения задания в одном процессе.

Для реализации задания с некоторой параллельной обработкой предоставляется ряд опций. На более высоком уровне есть два режима параллельной обработки:

  1. Однопроцессный, многопоточный
  2. Мультипроцесс

В этой быстрой статье мы обсудим разделение Step , которое может быть реализовано как для однопроцессных, так и для многопроцессорных заданий.

2. Разделение шага

Spring Batch с разделением предоставляет нам возможность разделить выполнение шага :

./68a4823d5e9ab5b76820d8107edaa15d.png

Обзор разделов

На приведенном выше рисунке показана реализация задания с разделенным шагом .

Есть шаг под названием «Мастер», выполнение которого разделено на несколько «Ведомых» шагов. Эти рабы могут занять место хозяина, и результат все равно останется неизменным. И master, и slave являются экземплярами Step . Подчиненные могут быть удаленными службами или просто локально исполняющими потоками.

При необходимости мы можем передавать данные от мастера к слейву. Метаданные (т. е. JobRepository ) гарантируют, что каждое ведомое устройство выполняется только один раз за одно выполнение задания.

Вот диаграмма последовательности, показывающая, как все это работает:

./cbc0ba7204a6c164325664a519690b58.png

Шаг разделения

Как показано, PartitionStep управляет выполнением. PartitionHandler отвечает за разделение работы «Master» на «Slave» . Самая правая Ступень — ведомая.

3. ПОМ Maven

Зависимости Maven такие же, как упоминалось в нашей предыдущей статье . То есть Spring Core, Spring Batch и зависимость для БД (в нашем случае SQLite ).

4. Конфигурация

В нашей вводной статье мы видели пример преобразования некоторых финансовых данных из файла CSV в файл XML. Давайте расширим тот же пример.

Здесь мы преобразуем финансовую информацию из 5 файлов CSV в соответствующие файлы XML, используя многопоточную реализацию.

Мы можем добиться этого, используя одно разделение Job и Step . У нас будет пять потоков, по одному для каждого файла CSV.

Прежде всего, давайте создадим Job:

@Bean(name = "partitionerJob")
public Job partitionerJob()
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob")
.start(partitionStep())
.build();
}

Как мы видим, это задание начинается с PartitioningStep . Это наш основной шаг, который будет разделен на различные подчиненные шаги:

@Bean
public Step partitionStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}

Здесь мы создадим PartitioningStep, используя StepBuilderFactory . Для этого нам нужно предоставить информацию о SlaveSteps и Partitioner .

Partitioner — это интерфейс, который позволяет определить набор входных значений для каждого из ведомых устройств. Другими словами, здесь находится логика для разделения задач на соответствующие потоки.

Давайте создадим его реализацию, названную CustomMultiResourcePartitioner , где мы поместим имена входных и выходных файлов в ExecutionContext для передачи на каждый подчиненный шаг:

public class CustomMultiResourcePartitioner implements Partitioner {

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName", "output"+k+++".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}

Мы также создадим bean-компонент для этого класса, где укажем исходный каталог для входных файлов:

@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}

Мы определим подчиненный шаг, как и любой другой шаг с считывателем и писателем. Читатель и писатель будут такими же, как мы видели в нашем вводном примере, за исключением того, что они получат параметр имени файла из StepExecutionContext.

Обратите внимание, что эти bean-компоненты должны иметь пошаговую область действия, чтобы они могли получать параметры stepExecutionContext на каждом шаге. Если они не будут иметь пошаговую область действия, их bean-компоненты будут созданы изначально и не будут принимать имена файлов на уровне шага:

@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {

FlatFileItemReader<Transaction> reader
= new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens
= {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/" + filename));
DefaultLineMapper<Transaction> lineMapper
= new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller,
@Value("#{stepExecutionContext[opFileName]}") String filename)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter
= new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(new ClassPathResource("xml/" + filename));
return itemWriter;
}

Упомянув читатель и писатель в подчиненном шаге, мы можем передать аргументы как null, потому что эти имена файлов не будут использоваться, так как они получат имена файлов из stepExecutionContext :

@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}

5. Вывод

В этом руководстве мы обсудили, как реализовать задание с параллельной обработкой с помощью Spring Batch.

Как всегда, полная реализация этого примера доступна на GitHub .