Перейти к основному содержимому

Spring Batch — тасклеты против чанков

· 14 мин. чтения

1. Введение

Spring Batch предоставляет два разных способа реализации задания: использование тасклетов и чанков .

В этой статье мы узнаем, как настроить и реализовать оба метода на простом примере из реальной жизни.

2. Зависимости

Начнем с добавления необходимых зависимостей :

<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>4.3.0</version>
<scope>test</scope>
</dependency>

Чтобы получить последнюю версию spring-batch-core и spring-batch-test , обратитесь к Maven Central.

3. Наш вариант использования

Рассмотрим файл CSV со следующим содержимым:

Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992

Первая позиция каждой строки представляет имя человека, а вторая позиция представляет его/ее дату рождения .

Наш вариант использования — создать еще один CSV-файл, содержащий имя и возраст каждого человека :

Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25

Теперь, когда наша предметная область ясна, давайте продолжим и создадим решение, используя оба подхода. Начнем с тасклетов.

4. Подход с тасклетами

4.1. Введение и дизайн

Тасклеты предназначены для выполнения одной задачи в рамках шага. Наша работа будет состоять из нескольких шагов, которые выполняются один за другим. Каждый шаг должен выполнять только одну определенную задачу .

Наша работа будет состоять из трех шагов:

  1. Прочитать строки из входного CSV-файла.
  2. Рассчитайте возраст для каждого человека во входном CSV-файле.
  3. Запишите имя и возраст каждого человека в новый выходной файл CSV.

Теперь, когда общая картина готова, давайте создадим по одному классу на шаг.

LinesReader будет отвечать за чтение данных из входного файла:

public class LinesReader implements Tasklet {
// ...
}

LinesProcessor рассчитает возраст для каждого человека в файле:

public class LinesProcessor implements Tasklet {
// ...
}

Наконец, LinesWriter будет нести ответственность за запись имен и возрастов в выходной файл:

public class LinesWriter implements Tasklet {
// ...
}

На данный момент все наши шаги реализуют интерфейс Tasklet . Это заставит нас реализовать его метод execute :

@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
// ...
}

В этом методе мы добавим логику для каждого шага. Прежде чем начать с этого кода, давайте настроим нашу работу.

4.2. Конфигурация

Нам нужно добавить некоторую конфигурацию в контекст приложения Spring . После добавления стандартного объявления bean-компонента для классов, созданных в предыдущем разделе, мы готовы создать определение нашего задания:

@Configuration
@EnableBatchProcessing
public class TaskletsConfig {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Bean
protected Step readLines() {
return steps
.get("readLines")
.tasklet(linesReader())
.build();
}

@Bean
protected Step processLines() {
return steps
.get("processLines")
.tasklet(linesProcessor())
.build();
}

@Bean
protected Step writeLines() {
return steps
.get("writeLines")
.tasklet(linesWriter())
.build();
}

@Bean
public Job job() {
return jobs
.get("taskletsJob")
.start(readLines())
.next(processLines())
.next(writeLines())
.build();
}

// ...

}

Это означает, что наш «taskletsJob» будет состоять из трех шагов. Первый ( readLines ) выполнит тасклет, определенный в bean-компоненте linesReader, и перейдет к следующему шагу: processLines. ProcessLines выполнит тасклет, определенный в bean-компоненте linesProcessor, и перейдет к последнему шагу: writeLines .

Наш рабочий поток определен, и мы готовы добавить немного логики!

4.3. Модель и утилиты

Поскольку мы будем манипулировать строками в CSV-файле, мы создадим класс Line:

public class Line implements Serializable {

private String name;
private LocalDate dob;
private Long age;

// standard constructor, getters, setters and toString implementation

}

Обратите внимание, что Line реализует Serializable. Это потому, что Line будет действовать как DTO для передачи данных между шагами. Согласно Spring Batch, объекты, которые передаются между шагами, должны быть сериализуемыми .

С другой стороны, мы можем начать думать о чтении и написании строк.

Для этого воспользуемся OpenCSV:

<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>4.1</version>
</dependency>

Найдите последнюю версию OpenCSV в Maven Central.

После включения OpenCSV мы также создадим класс FileUtils . Он предоставит методы для чтения и записи строк CSV:

public class FileUtils {

public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[] line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}

public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}

// ...
}

Обратите внимание, что readLine действует как оболочка над методом openCSV readNext и возвращает объект Line .

Точно так же writeLine оборачивает writeNext OpenCSV , получая объект Line . Полную реализацию этого класса можно найти в GitHub Project .

На данный момент мы готовы начать реализацию каждого шага.

4.4. LinesReader

Давайте продолжим и завершим наш класс LinesReader :

public class LinesReader implements Tasklet, StepExecutionListener {

private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);

private List<Line> lines;
private FileUtils fu;

@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}

@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}

Метод execute LinesReader создает экземпляр FileUtils по пути к входному файлу. Затем добавляет строки в список до тех пор, пока не останется строк для чтения .

Наш класс также реализует StepExecutionListener , который предоставляет два дополнительных метода: beforeStep и afterStep . Мы будем использовать эти методы для инициализации и закрытия вещей до и после запуска выполнения .

Если мы посмотрим на код afterStep , мы заметим строку, в которой список результатов ( строки) помещается в контекст задания, чтобы сделать его доступным для следующего шага:

stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);

На данный момент наш первый шаг уже выполнил свою задачу: загрузить строки CSV в список в памяти. Давайте перейдем ко второму шагу и обработаем их.

4.5. LinesProcessor

LinesProcessor также реализует StepExecutionListener и, конечно же, Tasklet . Это означает, что он также реализует методы beforeStep , execute и afterStep :

public class LinesProcessor implements Tasklet, StepExecutionListener {

private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);

private List<Line> lines;

@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}

@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}

Нетрудно понять, что он загружает список строк из контекста задания и вычисляет возраст каждого человека .

Нет необходимости помещать в контекст еще один список результатов, так как изменения происходят в том же объекте, что и на предыдущем шаге.

И мы готовы к нашему последнему шагу.

4.6. ЛинииПисатель

Задача LinesWriter состоит в том, чтобы пройтись по списку линий и записать имя и возраст в выходной файл :

public class LinesWriter implements Tasklet, StepExecutionListener {

private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);

private List<Line> lines;
private FileUtils fu;

@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}

@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}

Мы закончили с реализацией нашей работы! Давайте создадим тест, чтобы запустить его и посмотреть результаты.

4.7. Выполнение задания

Чтобы запустить задание, мы создадим тест:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Test
public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
throws Exception {

JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}

Аннотация ContextConfiguration указывает на класс конфигурации контекста Spring, который имеет наше определение задания.

Нам нужно добавить пару дополнительных bean-компонентов перед запуском теста:

@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}

@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(transactionManager());
return factory.getObject();
}

@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}

@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}

Все готово! Заходи и запускай тест!

После завершения задания файл output.csv имеет ожидаемое содержимое, а журналы показывают ход выполнения:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

Вот и все для тасклетов. Теперь мы можем перейти к подходу Chunks.

5 . Подход кусков

5.1. Введение и дизайн

Как следует из названия, этот подход выполняет действия над фрагментами данных . То есть вместо одновременного чтения, обработки и записи всех строк он будет читать, обрабатывать и записывать фиксированное количество записей (фрагментов) за раз.

Затем он будет повторять цикл до тех пор, пока в файле не останется данных.

В результате поток будет немного другим:

  1. Пока есть строки:
  • Сделайте для X количество строк:

  • Прочитайте одну строку

  • Обработать одну строку

  • Напишите X строк.

Итак, нам также нужно создать три bean-компонента для подхода, ориентированного на фрагменты :

public class LineReader {
// ...
}
public class LineProcessor {
// ...
}
public class LinesWriter {
// ...
}

Прежде чем перейти к реализации, давайте настроим нашу работу.

5.2. Конфигурация

Определение задания также будет выглядеть иначе:

@Configuration
@EnableBatchProcessing
public class ChunksConfig {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}

@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}

@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}

@Bean
protected Step processLines(ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return steps.get("processLines").<Line, Line> chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}

@Bean
public Job job() {
return jobs
.get("chunksJob")
.start(processLines(itemReader(), itemProcessor(), itemWriter()))
.build();
}

}

В этом случае есть только один шаг, выполняющий только один тасклет.

Однако этот тасклет определяет средства чтения, записи и процессора, которые будут обрабатывать фрагменты данных .

Обратите внимание, что интервал фиксации указывает количество данных, которые должны быть обработаны в одном фрагменте . Наша работа будет читать, обрабатывать и записывать по две строки за раз.

Теперь мы готовы добавить нашу логику блоков!

5.3. LineReader

LineReader будет отвечать за чтение одной записи и возврат экземпляра Line с его содержимым.

Чтобы стать читателем, наш класс должен реализовать интерфейс ItemReader :

public class LineReader implements ItemReader<Line> {
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null)
logger.debug("Read line: " + line.toString());
return line;
}
}

Код прост, он просто читает одну строку и возвращает ее. Мы также реализуем StepExecutionListener для окончательной версии этого класса:

public class LineReader implements 
ItemReader<Line>, StepExecutionListener {

private final Logger logger = LoggerFactory
.getLogger(LineReader.class);

private FileUtils fu;

@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}

@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}

Следует отметить, что beforeStep и afterStep выполняются до и после всего шага соответственно.

5.4. LineProcessor

LineProcessor следует почти той же логике, что и LineReader .

Однако в этом случае мы реализуем ItemProcessor и его метод process() :

public class LineProcessor implements ItemProcessor<Line, Line> {

private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}

}

Метод process() принимает входную строку, обрабатывает ее и возвращает выходную строку . Опять же, мы также реализуем StepExecutionListener:

public class LineProcessor implements 
ItemProcessor<Line, Line>, StepExecutionListener {

private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}

@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}

5.5. ЛинииПисатель

В отличие от считывателя и процессора, LinesWriter запишет целый кусок строк , чтобы получить список строк :

public class LinesWriter implements 
ItemWriter<Line>, StepExecutionListener {

private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);

private FileUtils fu;

@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}

@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}

Код LinesWriter говорит сам за себя. И снова мы готовы проверить нашу работу.

5.6. Выполнение задания

Мы создадим новый тест, такой же, как тот, который мы создали для подхода с тасклетами:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Test
public void givenChunksJob_whenJobEnds_thenStatusCompleted()
throws Exception {

JobExecution jobExecution = jobLauncherTestUtils.launchJob();

assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}

После настройки ChunksConfig , как описано выше для TaskletsConfig , все готово для запуска теста!

Как только задание выполнено, мы видим, что output.csv снова содержит ожидаемый результат, а журналы описывают поток:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

У нас тот же результат и другой поток . Журналы показывают, как задание выполняется при таком подходе.

6. Заключение

Различные контексты покажут необходимость того или иного подхода. В то время как тасклеты кажутся более естественными для сценариев «одна задача за другой», фрагменты предоставляют простое решение для чтения с разбивкой на страницы или ситуаций, когда мы не хотим хранить значительный объем данных в памяти.

Полную реализацию этого примера можно найти в проекте GitHub .