Перейти к основному содержимому

Введение в Spring Batch

· 6 мин. чтения

Задача: Медиана двух отсортированных массивов

Даны два отсортированных массива размерами n и m. Найдите медиану слияния этих двух массивов.
Временная сложность решения должна быть O(log(m + n)) ...

ANDROMEDA

1. Обзор

В этом руководстве мы рассмотрим практическое введение в Spring Batch, ориентированное на код. Spring Batch — это платформа обработки, предназначенная для надежного выполнения заданий.

Текущая версия 4.3 поддерживает Spring 5 и Java 8. Она также поддерживает JSR-352, новую спецификацию Java для пакетной обработки.

Вот несколько интересных и практических вариантов использования фреймворка.

2. Основы рабочего процесса

Spring Batch следует традиционной пакетной архитектуре, в которой репозиторий заданий выполняет работу по планированию и взаимодействию с заданием.

Работа может состоять из более чем одного шага. И каждый шаг обычно следует последовательности чтения данных, их обработки и записи.

И, конечно же, фреймворк сделает за нас большую часть тяжелой работы — особенно когда речь идет о работе с низкоуровневым сохранением работы с заданиями — используя sqlite для репозитория заданий.

2.1. Пример использования

Простой вариант использования, который мы здесь рассмотрим, — перенос некоторых данных финансовых транзакций из CSV в XML.

Входной файл имеет очень простую структуру.

Он содержит транзакцию в каждой строке, состоящую из имени пользователя, идентификатора пользователя, даты транзакции и суммы:

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3. ПОМ Maven

Зависимости, необходимые для этого проекта, — это ядро Spring, пакет Spring и коннектор sqlite jdbc:

<!-- SQLite database driver -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.15.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.0</version>
</dependency>

4. Пакетная конфигурация Spring

Первое, что мы сделаем, это настроим Spring Batch с помощью XML:

<!-- connect to SQLite database -->
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.sqlite.JDBC" />
<property name="url" value="jdbc:sqlite:repository.sqlite" />
<property name="username" value="" />
<property name="password" value="" />
</bean>

<!-- create job-meta tables automatically -->
<jdbc:initialize-database data-source="dataSource">
<jdbc:script
location="org/springframework/batch/core/schema-drop-sqlite.sql" />
<jdbc:script location="org/springframework/batch/core/schema-sqlite.sql" />
</jdbc:initialize-database>

<!-- stored job-meta in memory -->
<!--
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
-->

<!-- stored job-meta in database -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="sqlite" />
</bean>

<bean id="transactionManager" class=
"org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>

Конечно, конфигурация Java также доступна:

@Configuration
@EnableBatchProcessing
public class SpringConfig {

@Value("org/springframework/batch/core/schema-drop-sqlite.sql")
private Resource dropReopsitoryTables;

@Value("org/springframework/batch/core/schema-sqlite.sql")
private Resource dataReopsitorySchema;

@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}

@Bean
public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
throws MalformedURLException {
ResourceDatabasePopulator databasePopulator =
new ResourceDatabasePopulator();

databasePopulator.addScript(dropReopsitoryTables);
databasePopulator.addScript(dataReopsitorySchema);
databasePopulator.setIgnoreFailedDrops(true);

DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(dataSource);
initializer.setDatabasePopulator(databasePopulator);

return initializer;
}

private JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
factory.afterPropertiesSet();
return (JobRepository) factory.getObject();
}

private PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}

public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}

5. Конфигурация пакетного задания Spring

Давайте теперь напишем описание нашей работы для работы с CSV в XML:

<import resource="spring.xml" />

<bean id="record" class="com.foreach.spring_batch_intro.model.Transaction"></bean>
<bean id="itemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">

<property name="resource" value="input/record.csv" />

<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class=
"org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="username,userid,transactiondate,amount" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.foreach.spring_batch_intro.service.RecordFieldSetMapper" />
</property>
</bean>
</property>
</bean>

<bean id="itemProcessor"
class="com.foreach.spring_batch_intro.service.CustomItemProcessor" />

<bean id="itemWriter"
class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="recordMarshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>

<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.foreach.spring_batch_intro.model.Transaction</value>
</list>
</property>
</bean>
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>

А вот аналогичная конфигурация задания на основе Java:

public class SpringBatchConfig {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Value("input/record.csv")
private Resource inputCsv;

@Value("file:xml/output.xml")
private Resource outputXml;

@Bean
public ItemReader<Transaction> itemReader()
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Transaction> lineMapper =
new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLineMapper(lineMapper);
return reader;
}

@Bean
public ItemProcessor<Transaction, Transaction> itemProcessor() {
return new CustomItemProcessor();
}

@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter =
new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}

@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(new Class[] { Transaction.class });
return marshaller;
}

@Bean
protected Step step1(ItemReader<Transaction> reader,
ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) {
return steps.get("step1").<Transaction, Transaction> chunk(10)
.reader(reader).processor(processor).writer(writer).build();
}

@Bean(name = "firstBatchJob")
public Job job(@Qualifier("step1") Step step1) {
return jobs.get("firstBatchJob").start(step1).build();
}
}

Теперь, когда у нас есть вся конфигурация, давайте разберем ее и начнем обсуждать.

5.1. Чтение данных и создание объектов с помощью ItemReader

Во-первых, мы настроили cvsFileItemReader , который будет считывать данные из record.csv и преобразовывать их в объект Transaction :

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private LocalDateTime transactionDate;
private double amount;

/* getters and setters for the attributes */

@Override
public String toString() {
return "Transaction [username=" + username + ", userId=" + userId
+ ", transactionDate=" + transactionDate + ", amount=" + amount
+ "]";
}
}

Для этого он использует собственный маппер:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
Transaction transaction = new Transaction();

transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt(1));
transaction.setAmount(fieldSet.readDouble(3));
String dateString = fieldSet.readString(2);
transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
return transaction;
}
}

5.2. Обработка данных с помощью ItemProcessor

Мы создали собственный обработчик элементов CustomItemProcessor . Это не обрабатывает ничего, связанного с объектом транзакции.

Все, что он делает, это передает исходный объект, поступающий от читателя к писателю:

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

public Transaction process(Transaction item) {
return item;
}
}

5.3. Запись объектов в ФС с помощью ItemWriter

Наконец, мы собираемся сохранить эту транзакцию в файле XML, расположенном по адресу xml/output.xml :

<bean id="itemWriter"
class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="recordMarshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>

5.4. Настройка пакетного задания

Итак, все, что нам нужно сделать, это соединить точки с заданием, используя синтаксис batch:job .

Обратите внимание на интервал фиксации . Это количество транзакций, которое должно храниться в памяти перед фиксацией пакета в itemWriter .

Он будет хранить транзакции в памяти до этого момента (или до тех пор, пока не встретится конец входных данных):

<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>

5.5. Запуск пакетного задания

Теперь давайте все настроим и запустим:

public class App {
public static void main(String[] args) {
// Spring Java config
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringConfig.class);
context.register(SpringBatchConfig.class);
context.refresh();

JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("firstBatchJob");
System.out.println("Starting the batch job");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job completed");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}

6. Заключение

В этой статье показано , как работать с Spring Batch и как использовать его в простом сценарии использования.

Он показывает, как мы можем легко разработать наш конвейер пакетной обработки и как мы можем настроить различные этапы чтения, обработки и записи.

Полную реализацию этой статьи можно найти в проекте GitHub . Это проект на основе Eclipse, поэтому его легко импортировать и запускать как есть.