Перейти к основному содержимому

Поддержка транзакций в Spring Integration

· 5 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим поддержку транзакций в среде интеграции Spring .

2. Транзакции в потоках сообщений

Spring обеспечивает поддержку синхронизации ресурсов с транзакциями с самых ранних версий. Мы часто используем его для синхронизации транзакций, управляемых несколькими менеджерами транзакций.

Например, мы можем синхронизировать фиксацию JMS с фиксацией JDBC .

С другой стороны, у нас также есть более сложные варианты использования в потоках сообщений. Они включают синхронизацию нетранзакционных ресурсов, а также различных типов транзакционных ресурсов.

Как правило, потоки обмена сообщениями могут инициироваться двумя различными типами механизмов.

2.1. Потоки сообщений, инициированные пользовательским процессом

Некоторые потоки сообщений зависят от инициирования сторонних процессов, таких как запуск сообщения в каком-либо канале сообщений или вызов метода шлюза сообщений.

Мы настраиваем поддержку транзакций для этих потоков через стандартную поддержку транзакций Spring . Потоки не должны быть явно настроены Spring Integration для поддержки транзакций. Поток сообщений Spring Integration естественным образом учитывает транзакционную семантику компонентов Spring.

Например, мы можем аннотировать ServiceActivator или его метод с помощью @Transactional :

@Transactional
public class TxServiceActivator {

@Autowired
private JdbcTemplate jdbcTemplate;

public void storeTestResult(String testResult) {
this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
log.info("Test result is stored: {}", testResult);
}
}

Мы можем запустить метод storeTestResult из любого компонента, и транзакционный контекст будет применяться как обычно. При таком подходе у нас есть полный контроль над конфигурацией транзакции.

2.2. Потоки сообщений, инициированные процессом демона

Мы часто используем этот тип потока сообщений для автоматизации. Например, опросчик опрашивает очередь сообщений, чтобы инициировать новый поток сообщений с опрошенным сообщением, или планировщик, планирующий процесс путем создания нового сообщения и инициирования потока сообщений в заранее определенное время.

По сути, это потоки на основе триггеров, инициированные триггерным процессом (процессом демона). Для этих потоков мы должны предоставить некоторую конфигурацию транзакции для создания контекста транзакции всякий раз, когда начинается новый поток сообщений.

Через конфигурацию мы делегируем потоки существующей поддержке транзакций Spring.

Мы сосредоточимся на поддержке транзакций для этого типа потока сообщений в оставшейся части статьи.

3. Поддержка транзакций опроса

Опросчик является распространенным компонентом в потоках интеграции. Он периодически извлекает данные из различных источников и передает их по цепочке интеграции.

Spring Integration обеспечивает поддержку транзакций для опросников из коробки. Каждый раз, когда мы настраиваем компонент Poller , мы можем предоставить транзакционную конфигурацию:

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
...
}

@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(5000)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.get();
}

private TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder()
.transactionManager(txManager)
.build();
}

Мы должны предоставить ссылку на TransactionManager и пользовательскую TransactionSynchronizationFactory , или мы можем полагаться на значения по умолчанию. Внутри собственная транзакция Spring оборачивает процесс. В результате все потоки сообщений, инициированные этим средством опроса, являются транзакционными.

4. Границы сделки

Когда транзакция запускается, контекст транзакции всегда привязывается к текущему потоку. Независимо от того, сколько конечных точек и каналов у нас может быть в нашем потоке сообщений, наш контекст транзакции всегда будет сохраняться, пока поток живет в том же потоке.

Если мы нарушим его, инициировав новый поток в каком-либо сервисе, мы также нарушим транзакционную границу. По сути, транзакция завершится в этот момент.

Если между потоками произошла успешная передача обслуживания, поток будет считаться успешным. Это зафиксирует транзакцию в этот момент, но поток будет продолжаться, и это все равно может привести к исключению где- то ниже по течению.

Следовательно, это исключение может вернуться к инициатору потока, так что транзакция может закончиться откатом. Вот почему мы должны использовать транзакционные каналы в любой точке, где может быть нарушена граница потока .

Например, мы должны использовать JMS, JDBC или какой-либо другой транзакционный канал.

5. Синхронизация транзакций

В некоторых случаях полезно синхронизировать определенные операции с транзакцией, охватывающей весь поток.

Например, мы покажем, как использовать опросчик , который считывает входящий файл и на основе его содержимого выполняет обновление базы данных. Когда операция базы данных завершается, она также переименовывает файл в зависимости от успеха операции.

Прежде чем мы перейдем к примеру, важно понять, что этот подход синхронизирует операции в файловой системе с транзакцией. Это не делает файловую систему, которая по своей сути не является транзакционной, на самом деле становится транзакционной.

Транзакция начинается до опроса и либо фиксируется, либо откатывается после завершения потока, за которым следует синхронизированная операция в файловой системе.

Во- первых, мы определяем InboundChannelAdapter с помощью простого Poller :

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(5000)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.get();
}

`Как объяснялось ранее , Poller содержит ссылку на TransactionManager .Кроме того, он также содержит ссылку на TransactionSynchronizationFactory` . Этот компонент обеспечивает механизм синхронизации операций файловой системы с транзакцией:

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();

SpelExpressionParser spelParser = new SpelExpressionParser();

processor.setAfterCommitExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));

processor.setAfterRollbackExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

return new DefaultTransactionSynchronizationFactory(processor);
}

Если транзакция фиксируется, TransactionSynchronizationFactory переименует файл, добавив «.PASSED» к имени файла. Однако, если он откатится, он добавит «.FAILED».

InputChannel преобразует полезную нагрузку с помощью FileToStringTransformer и делегирует ее toServiceChannel . Этот канал привязан к ServiceActivator :

@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}

@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}

ServiceActivator считывает входящий файл, содержащий результаты экзамена студента. Он записывает результат в базу данных. Если результат содержит строку «fail», он выдает Exception , что приводит к откату базы данных:

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

jdbcTemplate.update("insert into STUDENT values(?)", payload);

if (payload.toLowerCase().startsWith("fail")) {
log.error("Service failure. Test result: {} ", payload);
throw new RuntimeException("Service failure.");
}

log.info("Service success. Test result: {}", payload);
}

После успешной фиксации или отката операции базы данных TransactionSynchronizationFactory синхронизирует операцию файловой системы с ее результатом.

6. Заключение

В этой статье мы объяснили поддержку транзакций в среде Spring Integration . Кроме того, мы продемонстрировали, как синхронизировать транзакцию с операциями над нетранзакционным ресурсом, таким как файловая система.

Полный исходный код примера доступен на GitHub.