1. Обзор
В нашем предыдущем введении в Spring Batch мы представили фреймворк как инструмент пакетной обработки. Мы также изучили детали конфигурации и реализацию однопоточного выполнения задания в одном процессе.
Для реализации задания с некоторой параллельной обработкой предоставляется ряд опций. На более высоком уровне есть два режима параллельной обработки:
- Однопроцессный, многопоточный
- Мультипроцесс
В этой быстрой статье мы обсудим разделение Step
, которое может быть реализовано как для однопроцессных, так и для многопроцессорных заданий.
2. Разделение шага
Spring Batch с разделением предоставляет нам возможность разделить выполнение шага
:
На приведенном выше рисунке показана реализация задания
с разделенным шагом
.
Есть шаг
под названием «Мастер», выполнение которого разделено на несколько «Ведомых» шагов. Эти рабы могут занять место хозяина, и результат все равно останется неизменным. И master, и slave являются экземплярами Step
. Подчиненные могут быть удаленными службами или просто локально исполняющими потоками.
При необходимости мы можем передавать данные от мастера к слейву. Метаданные (т. е. JobRepository
) гарантируют, что каждое ведомое устройство выполняется только один раз за одно выполнение задания.
Вот диаграмма последовательности, показывающая, как все это работает:
Как показано, PartitionStep
управляет выполнением. PartitionHandler
отвечает за разделение работы «Master» на «Slave» . Самая правая Ступень
— ведомая.
3. ПОМ Maven
Зависимости Maven такие же, как упоминалось в нашей предыдущей статье . То есть Spring Core, Spring Batch и зависимость для БД (в нашем случае SQLite
).
4. Конфигурация
В нашей вводной статье мы видели пример преобразования некоторых финансовых данных из файла CSV в файл XML. Давайте расширим тот же пример.
Здесь мы преобразуем финансовую информацию из 5 файлов CSV в соответствующие файлы XML, используя многопоточную реализацию.
Мы можем добиться этого, используя одно разделение Job
и Step .
У нас будет пять потоков, по одному для каждого файла CSV.
Прежде всего, давайте создадим Job:
@Bean(name = "partitionerJob")
public Job partitionerJob()
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob")
.start(partitionStep())
.build();
}
Как мы видим, это задание
начинается с PartitioningStep
. Это наш основной шаг, который будет разделен на различные подчиненные шаги:
@Bean
public Step partitionStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
Здесь мы создадим PartitioningStep, используя StepBuilderFactory
. Для этого нам нужно предоставить информацию о SlaveSteps
и Partitioner
.
Partitioner
— это интерфейс, который позволяет определить набор входных значений для каждого из ведомых устройств. Другими словами, здесь находится логика для разделения задач на соответствующие потоки.
Давайте создадим его реализацию, названную CustomMultiResourcePartitioner
, где мы поместим имена входных и выходных файлов в ExecutionContext
для передачи на каждый подчиненный шаг:
public class CustomMultiResourcePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName", "output"+k+++".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}
Мы также создадим bean-компонент для этого класса, где укажем исходный каталог для входных файлов:
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
Мы определим подчиненный шаг, как и любой другой шаг с считывателем и писателем. Читатель и писатель будут такими же, как мы видели в нашем вводном примере, за исключением того, что они получат параметр имени файла из StepExecutionContext.
Обратите внимание, что эти bean-компоненты должны иметь пошаговую область действия, чтобы они могли получать параметры stepExecutionContext
на каждом шаге. Если они не будут иметь пошаговую область действия, их bean-компоненты будут созданы изначально и не будут принимать имена файлов на уровне шага:
@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader
= new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens
= {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/" + filename));
DefaultLineMapper<Transaction> lineMapper
= new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller,
@Value("#{stepExecutionContext[opFileName]}") String filename)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter
= new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(new ClassPathResource("xml/" + filename));
return itemWriter;
}
Упомянув читатель и писатель в подчиненном шаге, мы можем передать аргументы как null, потому что эти имена файлов не будут использоваться, так как они получат имена файлов из stepExecutionContext
:
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}
5. Вывод
В этом руководстве мы обсудили, как реализовать задание с параллельной обработкой с помощью Spring Batch.
Как всегда, полная реализация этого примера доступна на GitHub .