Перейти к основному содержимому

Пакетная обработка Java EE 7

· 13 мин. чтения

1. Введение

Представьте, что нам пришлось вручную выполнять такие задачи, как обработка платежных ведомостей, расчет процентов и выставление счетов. Это стало бы довольно скучным, подверженным ошибкам и бесконечным списком ручных задач!

В этом руководстве мы рассмотрим Java Batch Processing ( JSR 352 ), часть платформы Jakarta EE и отличную спецификацию для автоматизации подобных задач. Он предлагает разработчикам приложений модель разработки надежных систем пакетной обработки, чтобы они могли сосредоточиться на бизнес-логике.

2. Зависимости Maven

Поскольку JSR 352 — это всего лишь спецификация, нам нужно включить его API и реализацию , например jberet :

<dependency>
<groupId>javax.batch</groupId>
<artifactId>javax.batch-api</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-core</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-support</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-se</artifactId>
<version>1.0.2.Final</version>
</dependency>

Мы также добавим базу данных в памяти, чтобы мы могли рассмотреть несколько более реалистичных сценариев.

3. Ключевые понятия

JSR 352 вводит несколько концепций, которые мы можем рассматривать следующим образом:

./3e3212f95ba0c7deeb6f7418120157c0.png

Давайте сначала определим каждую часть:

  • Начиная слева, у нас есть JobOperator . Он управляет всеми аспектами обработки заданий, такими как запуск, остановка и перезапуск.
  • Далее у нас есть Работа . Задание — это логический набор шагов; он инкапсулирует весь пакетный процесс
  • Задание будет содержать от 1 до n шагов . Каждый шаг представляет собой независимую последовательную единицу работы. Шаг состоит из чтения ввода, обработки этого ввода и записи вывода .
  • И последнее, но не менее важное: у нас есть JobRepository , в котором хранится текущая информация о заданиях. Это помогает отслеживать задания, их состояние и результаты их выполнения.

Шаги содержат немного больше деталей, поэтому давайте рассмотрим их далее. Сначала мы рассмотрим шаги Chunk , а затем Batchlet .

4. Создание чанка

Как было сказано ранее, чанк — это разновидность шага . Мы будем часто использовать кусок для выражения операции, которая выполняется снова и снова, скажем, над набором элементов. Это что-то вроде промежуточных операций из Java Streams.

При описании чанка нам нужно указать, откуда брать элементы, как их обрабатывать и куда их потом отправлять.

4.1. Чтение предметов

Чтобы читать элементы, нам нужно реализовать ItemReader.

В этом случае мы создадим считыватель, который будет просто выдавать числа от 1 до 10:

@Named
public class SimpleChunkItemReader extends AbstractItemReader {
private Integer[] tokens;
private Integer count;

@Inject
JobContext jobContext;

@Override
public Integer readItem() throws Exception {
if (count >= tokens.length) {
return null;
}

jobContext.setTransientUserData(count);
return tokens[count++];
}

@Override
public void open(Serializable checkpoint) throws Exception {
tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 };
count = 0;
}
}

Здесь мы просто читаем внутреннее состояние класса. Но, конечно, readItem может извлекать из базы данных , из файловой системы или какого-то другого внешнего источника.

Обратите внимание, что мы сохраняем часть этого внутреннего состояния, используя JobContext#setTransientUserData() , который пригодится позже.

Также обратите внимание на параметр контрольной точки . Мы тоже поднимем это снова.

4.2. Обработка элементов

Конечно, причина, по которой мы используем фрагменты, заключается в том, что мы хотим выполнить какую-то операцию над нашими элементами!

Каждый раз, когда мы возвращаем null из обработчика элементов, мы удаляем этот элемент из пакета.

Итак, скажем здесь, что мы хотим сохранить только четные числа. Мы можем использовать ItemProcessor , который отбрасывает нечетные, возвращая null :

@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
@Override
public Integer processItem(Object t) {
Integer item = (Integer) t;
return item % 2 == 0 ? item : null;
}
}

processItem будет вызываться один раз для каждого элемента, который генерирует наш ItemReader .

4.3. Написание предметов

Наконец, задание вызовет ItemWriter , чтобы мы могли записать наши преобразованные элементы:

@Named
public class SimpleChunkWriter extends AbstractItemWriter {
List<Integer> processed = new ArrayList<>();
@Override
public void writeItems(List<Object> items) throws Exception {
items.stream().map(Integer.class::cast).forEach(processed::add);
}
}

Как долго предметы ? Через мгновение мы определим размер фрагмента, который будет определять размер списка, отправляемого в writeItems .

4.4. Определение фрагмента в задании

Теперь мы объединяем все это в файл XML, используя JSL или язык спецификации заданий. Обратите внимание, что мы перечислим наш считыватель, процессор, блокировщик, а также размер блока:

<job id="simpleChunk">
<step id="firstChunkStep" >
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
</job>

Размер чанка определяет, как часто прогресс в чанке фиксируется в репозитории заданий , что важно для гарантии завершения в случае сбоя части системы.

Нам нужно поместить этот файл в META-INF/batch-jobs для . jar и в WEB-INF/classes/META-INF/batch-jobs для файлов .war .

Мы дали нашей работе идентификатор «simpleChunk», так что давайте попробуем это в модульном тесте.

Теперь задания выполняются асинхронно, что затрудняет их тестирование. В примере обязательно проверьте наш BatchTestHelper , который опрашивает и ожидает завершения задания:

@Test
public void givenChunk_thenBatch_completesWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

Так вот что такое куски. Теперь давайте взглянем на пакеты.

5. Создание пакетной обработки

Не все четко вписывается в итеративную модель. Например, у нас может быть задача, которую нам просто нужно вызвать один раз, выполнить до завершения и вернуть статус выхода.

Договор на батч довольно прост:

@Named
public class SimpleBatchLet extends AbstractBatchlet {

@Override
public String process() throws Exception {
return BatchStatus.COMPLETED.toString();
}
}

Как и JSL:

<job id="simpleBatchLet">
<step id="firstStep" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>

И мы можем протестировать его, используя тот же подход, что и раньше:

@Test
public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

Итак, мы рассмотрели несколько различных способов реализации шагов.

Теперь давайте рассмотрим механизмы маркировки и гарантии прогресса.

6. Пользовательская контрольная точка

Неудачи обязательно случаются в середине работы. Должны ли мы просто начать все сначала, или мы можем как-то начать с того места, на котором остановились?

Как следует из названия, контрольные точки помогают нам периодически устанавливать закладки в случае сбоя.

По умолчанию конец обработки чанка является естественной контрольной точкой .

Однако мы можем настроить его с помощью нашего собственного CheckpointAlgorithm :

@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {

@Inject
JobContext jobContext;

@Override
public boolean isReadyToCheckpoint() throws Exception {
int counterRead = (Integer) jobContext.getTransientUserData();
return counterRead % 5 == 0;
}
}

Помните счётчик, который мы поместили в переходные данные ранее? Здесь мы можем вывести его с помощью JobContext#getTransientUserData , чтобы указать, что мы хотим фиксировать каждое 5-е обработанное число.

Без этого фиксация происходила бы в конце каждого фрагмента или, в нашем случае, в каждом третьем числе.

И затем мы сопоставляем это с директивой checkout-algorithm в нашем XML под нашим чаном :

<job id="customCheckPoint">
<step id="firstChunkStep" >
<chunk item-count="3" checkpoint-policy="custom">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
<checkpoint-algorithm ref="customCheckPoint"/>
</chunk>
</step>
</job>

Давайте протестируем код, снова отметив, что некоторые шаблонные шаги скрыты в BatchTestHelper :

@Test
public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception {
// ... start job and wait for completion

jobOperator.getStepExecutions(executionId)
.stream()
.map(BatchTestHelper::getCommitCount)
.forEach(count -> assertEquals(3L, count.longValue()));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

Таким образом, мы могли бы ожидать, что количество коммитов будет равно 2, поскольку у нас есть десять элементов, и мы настроили коммиты для каждого 5-го элемента. Но в конце фреймворк делает еще один окончательный коммит чтения, чтобы убедиться, что все обработано, что приводит нас к 3.

Далее давайте посмотрим, как обрабатывать ошибки.

7. Обработка исключений

По умолчанию оператор задания пометит наше задание как НЕ ПРОШЁТОЕ в случае исключения.

Давайте изменим наш считыватель элементов, чтобы убедиться, что он не работает:

@Override
public Integer readItem() throws Exception {
if (tokens.hasMoreTokens()) {
String tempTokenize = tokens.nextToken();
throw new RuntimeException();
}
return null;
}

А затем протестируйте:

@Test
public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception {
// ... start job and wait for completion
assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}

Но мы можем переопределить это поведение по умолчанию несколькими способами:

  • skip-limit указывает количество исключений, которые этот шаг будет игнорировать перед сбоем
  • retry-limit указывает, сколько раз оператор задания должен повторить шаг, прежде чем произойдет сбой.
  • skippable-exception-class указывает набор исключений, которые будут игнорироваться при обработке чанков.

Итак, мы можем отредактировать нашу работу так, чтобы она игнорировала RuntimeException , а также некоторые другие, просто для иллюстрации:

<job id="simpleErrorSkipChunk" >
<step id="errorStep" >
<chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
<reader ref="myItemReader"/>
<processor ref="myItemProcessor"/>
<writer ref="myItemWriter"/>
<skippable-exception-classes>
<include class="java.lang.RuntimeException"/>
<include class="java.lang.UnsupportedOperationException"/>
</skippable-exception-classes>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
<include class="java.lang.UnsupportedOperationException"/>
</retryable-exception-classes>
</chunk>
</step>
</job>

И теперь наш код пройдет:

@Test
public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception {
// ... start job and wait for completion
jobOperator.getStepExecutions(executionId).stream()
.map(BatchTestHelper::getProcessSkipCount)
.forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8. Выполнение нескольких шагов

Ранее мы упоминали, что задание может состоять из любого количества шагов, так что давайте посмотрим на это сейчас.

8.1. Запуск следующего шага

По умолчанию каждый шаг является последним шагом в задании .

Чтобы выполнить следующий шаг в пакетном задании, нам нужно будет явно указать, используя следующий атрибут в определении шага:

<job id="simpleJobSequence">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
<step id="firstBatchStepStep2" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>

Если мы забудем этот атрибут, то следующий шаг в последовательности не будет выполнен.

И мы можем увидеть, как это выглядит в API:

@Test
public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception {
// ... start job and wait for completion
assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.2. Потоки

Последовательность шагов также может быть инкапсулирована в поток . Когда поток завершается, весь поток переходит к исполнительному элементу . Кроме того, элементы внутри потока не могут переходить к элементам вне потока.

Мы можем, скажем, выполнить два шага внутри потока, а затем перевести этот поток в изолированный шаг:

<job id="flowJobSequence">
<flow id="flow1" next="firstBatchStepStep3">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader" />
<processor ref="simpleChunkItemProcessor" />
<writer ref="simpleChunkWriter" />
</chunk>
</step>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

И мы по-прежнему можем видеть выполнение каждого шага независимо:

@Test
public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception {
// ... start job and wait for completion

assertEquals(3, jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.3. Решения

У нас также есть поддержка if/else в виде решений . Решения обеспечивают настраиваемый способ определения последовательности шагов, потоков и разбиений .

Подобно шагам, он работает с переходными элементами, такими как следующий , который может направлять или завершать выполнение задания.

Давайте посмотрим, как задание может быть настроено:

<job id="decideJobSequence">
<step id="firstBatchStepStep1" next="firstDecider">
<batchlet ref="simpleBatchLet" />
</step>
<decision id="firstDecider" ref="deciderJobSequence">
<next on="two" to="firstBatchStepStep2"/>
<next on="three" to="firstBatchStepStep3"/>
</decision>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet" />
</step>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

Любой элемент решения должен быть сконфигурирован с классом, который реализует Decider . Его задача — вернуть решение в виде строки .

Каждое следующее внутреннее решение похоже на case в операторе switch .

8.4. Сплиты

Разделения удобны, поскольку они позволяют нам выполнять потоки одновременно:

<job id="splitJobSequence">
<split id="split1" next="splitJobSequenceStep3">
<flow id="flow1">
<step id="splitJobSequenceStep1">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
<flow id="flow2">
<step id="splitJobSequenceStep2">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
</split>
<step id="splitJobSequenceStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

Конечно, это означает, что заказ не гарантирован .

Давайте подтвердим, что они все еще запускаются. Шаги потока будут выполняться в произвольном порядке, но изолированный шаг всегда будет последним:

@Test
public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception {
// ... start job and wait for completion
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);

assertEquals(3, stepExecutions.size());
assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

9. Разделение задания

Мы также можем использовать пакетные свойства в нашем коде Java, которые были определены в нашей работе.

Их можно разделить на три уровня: задание, шаг и пакетный артефакт .

Давайте посмотрим на некоторые примеры того, как они потребляли.

Когда мы хотим использовать свойства на уровне задания:

@Inject
JobContext jobContext;
...
jobProperties = jobContext.getProperties();
...

Это также можно использовать на уровне шага:

@Inject
StepContext stepContext;
...
stepProperties = stepContext.getProperties();
...

Когда мы хотим использовать свойства на уровне пакетного артефакта:

@Inject
@BatchProperty(name = "name")
private String nameString;

Это удобно с разделами.

Видите ли, с разделением мы можем запускать потоки одновременно. Но мы также можем разделить шаг на n наборов элементов или задать отдельные входные данные, что дает нам еще один способ разделить работу на несколько потоков.

Чтобы понять, какой сегмент работы должен выполнять каждый раздел, мы можем объединить свойства с разделами:

<job id="injectSimpleBatchLet">
<properties>
<property name="jobProp1" value="job-value1"/>
</properties>
<step id="firstStep">
<properties>
<property name="stepProp1" value="value1" />
</properties>
<batchlet ref="injectSimpleBatchLet">
<properties>
<property name="name" value="#{partitionPlan['name']}" />
</properties>
</batchlet>
<partition>
<plan partitions="2">
<properties partition="0">
<property name="name" value="firstPartition" />
</properties>
<properties partition="1">
<property name="name" value="secondPartition" />
</properties>
</plan>
</partition>
</step>
</job>

10. Остановить и перезапустить

Теперь это все для определения рабочих мест. Теперь давайте поговорим об управлении ими.

В наших модульных тестах мы уже видели, что можем получить экземпляр JobOperator из BatchRuntime :

JobOperator jobOperator = BatchRuntime.getJobOperator();

И тогда мы можем начать работу:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

Однако мы также можем остановить задание:

jobOperator.stop(executionId);

И, наконец, мы можем перезапустить задание:

executionId = jobOperator.restart(executionId, new Properties());

Давайте посмотрим, как мы можем остановить запущенное задание:

@Test
public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobOperator.stop(executionId);
jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}

И если пакет ОСТАНОВЛЕН , то мы можем перезапустить его:

@Test
public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() {
// ... start and stop the job

assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));

assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

11. Получение заданий

При отправке пакетного задания среда выполнения пакета создает экземпляр JobExecution для его отслеживания .

Чтобы получить JobExecution для идентификатора выполнения, мы можем использовать метод JobOperator#getJobExecution(executionId) .

Кроме того, StepExecution предоставляет полезную информацию для отслеживания выполнения шага .

Чтобы получить StepExecution для идентификатора выполнения, мы можем использовать метод JobOperator#getStepExecutions(executionId) .

Отсюда мы можем получить несколько метрик об этом шаге с помощью StepExecution#getMetrics:

@Test
public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception {
// ... start job and wait for completion
assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
assertTrue(jobOperator.getParameters(executionId).isEmpty());
StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0);
Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue());
assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue());
// ... and many more!
}

12. Недостатки

JSR 352 является мощным инструментом, хотя ему не хватает нескольких аспектов:

  • Кажется, что не хватает читателей и писателей, которые могут обрабатывать другие форматы, такие как JSON.
  • Нет поддержки дженериков
  • Разделение поддерживает только один шаг
  • API не предлагает ничего для поддержки планирования (хотя J2EE имеет отдельный модуль планирования).
  • Из-за асинхронного характера тестирование может быть сложной задачей.
  • API довольно подробный

13. Заключение

В этой статье мы рассмотрели JSR 352 и узнали о чанках, пакетах, разбиениях, потоках и многом другом. Тем не менее, мы едва поцарапали поверхность.

Как всегда, демо-код можно найти на GitHub .