Перейти к основному содержимому

R2DBC — реактивное подключение к реляционной базе данных

· 10 мин. чтения

1. Обзор

В этом руководстве мы покажем, как мы можем использовать R2DBC для выполнения операций с базой данных реактивным способом .

Чтобы изучить R2DBC, мы создадим простое приложение Spring WebFlux REST, которое реализует операции CRUD для одного объекта, используя для достижения этой цели только асинхронные операции.

2. Что такое R2DBC ?

Реактивная разработка находится на подъеме: новые фреймворки появляются каждый день, а существующие получают все большее распространение. Однако основной проблемой реактивной разработки является тот факт, что доступ к базе данных в мире Java/JVM остается в основном синхронным . Это прямое следствие того, как был разработан JDBC, и привело к некоторым уродливым хакам для адаптации этих двух принципиально разных подходов.

Чтобы удовлетворить потребность в асинхронном доступе к базе данных в мире Java, появились два стандарта. Первый из них, ADBC (API для асинхронного доступа к базе данных), поддерживается Oracle, но на момент написания этой статьи кажется, что он несколько застопорился и не имеет четкой временной шкалы.

Второй, который мы рассмотрим здесь, — это R2DBC (Reactive Relational Database Connectivity), работа сообщества, возглавляемая командой из Pivotal и других компаний. Этот проект, который все еще находится в стадии бета-тестирования, продемонстрировал большую жизнеспособность и уже предоставляет драйверы для баз данных Postgres, H2 и MSSQL.

3. Настройка проекта

Использование R2DBC в проекте требует добавления зависимостей к основному API и подходящего драйвера. В нашем примере мы будем использовать H2, так что это означает всего две зависимости:

<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>0.8.0.M7</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>0.8.0.M7</version>
</dependency>

В Maven Central пока нет артефактов R2DBC, поэтому нам также нужно добавить в наш проект пару репозиториев Spring:

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

4. Заводская настройка соединений

Первое, что нам нужно сделать, чтобы получить доступ к базе данных с помощью R2DBC, — это создать объект ConnectionFactory , который играет роль, аналогичную DataSource JDBC. Самый простой способ создать ConnectionFactory — использовать класс ConnectionFactory .

Этот класс имеет статические методы, которые принимают объект ConnectionFactoryOptions и возвращают ConnectionFactory. Поскольку нам понадобится только один экземпляр нашей ConnectionFactory , давайте создадим @Bean , который мы сможем позже использовать через инъекцию везде, где нам нужно:

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(properties.getUser())) {
ob = ob.option(USER, properties.getUser());
}
if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
ob = ob.option(PASSWORD, properties.getPassword());
}
return ConnectionFactories.get(ob.build());
}

Здесь мы берем параметры, полученные от вспомогательного класса, украшенного аннотацией @ConfigurationProperties, и заполняем наш экземпляр ConnectionFactoryOptions . Чтобы заполнить его, R2DBC реализует шаблон построителя с одним методом option , который принимает Option и значение.

R2DBC определяет ряд хорошо известных опций, таких как USERNAME и PASSWORD , которые мы использовали выше. Другой способ установить эти параметры — передать строку соединения методу parse() класса ConnectionFactoryOptions .

Вот пример типичного URL-адреса подключения R2DBC:

r2dbc:h2:mem://./testdb

Разобьем эту строку на составляющие:

  • r2dbc : идентификатор фиксированной схемы для URL-адресов R2DBC — другая допустимая схема — rd2bcs , используемая для защищенных SSL-соединений .
  • h2 : идентификатор драйвера, используемый для поиска соответствующей фабрики соединений.
  • mem : протокол, специфичный для драйвера — в нашем случае это соответствует базе данных в памяти.
  • //./testdb : Строка, специфичная для драйвера, обычно содержащая хост, базу данных и любые дополнительные параметры.

Когда у нас есть готовый набор опций, мы передаем его в метод статической фабрики get() для создания нашего bean-компонента ConnectionFactory .

5. Выполнение операторов

Подобно JDBC, использование R2DBC в основном связано с отправкой операторов SQL в базу данных и обработкой наборов результатов. Однако, поскольку R2DBC является реактивным API, он сильно зависит от типов реактивных потоков, таких как Publisher и Subscriber .

Использование этих типов напрямую немного громоздко, поэтому мы будем использовать типы реактора проекта, такие как Mono и Flux , которые помогут нам писать более чистый и лаконичный код.

В следующих разделах мы увидим, как реализовать задачи, связанные с базой данных, путем создания реактивного класса DAO для простого класса Account . Этот класс содержит всего три свойства и имеет соответствующую таблицу в нашей базе данных:

public class Account {
private Long id;
private String iban;
private BigDecimal balance;
// ... getters and setters omitted
}

5.1. Получение соединения

Прежде чем мы сможем отправлять какие-либо операторы в базу данных, нам нужен экземпляр Connection . Мы уже видели, как создать ConnectionFactory , поэтому неудивительно, что мы будем использовать его для получения Connection . Что мы должны помнить, так это то, что теперь вместо обычного Connection мы получаем Publisher одного Connection.

Наш ReactiveAccountDao, который является обычным Spring @Component , получает свою ConnectionFactory через внедрение конструктора, поэтому он легко доступен в методах обработчика.

Давайте посмотрим на первые пару строк метода findById() , чтобы увидеть, как получить и начать использовать Connection :

public Mono<Account>> findById(Long id) {         
return Mono.from(connectionFactory.create())
.flatMap(c ->
// use the connection
)
// ... downstream processing omitted
}

Здесь мы адаптируем Publisher , возвращенный из нашей ConnectionFactory , в Mono , который является исходным источником для нашего потока событий.

5.1. Подготовка и подача заявлений

Теперь, когда у нас есть Connection , давайте воспользуемся им для создания оператора и привяжем к нему параметр:

.flatMap( c -> 
Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
.bind("$1", id)
.execute())
.doFinally((st) -> close(c))
)

Метод createStatement объекта Connection принимает строку SQL-запроса, которая может дополнительно содержать заполнители привязки, называемые в спецификации «маркерами» . ``

Пара замечаний: во-первых, createStatement — это синхронная операция , которая позволяет нам использовать плавный стиль для привязки значений к возвращаемому оператору; во-вторых, и это очень важно, синтаксис заполнителя/маркера зависит от поставщика!

В этом примере мы используем специальный синтаксис H2, который использует $n для обозначения параметров. Другие поставщики могут использовать другой синтаксис, например :param , @Pn или какое-то другое соглашение. Это важный аспект, на который мы должны обратить внимание при переносе устаревшего кода на этот новый API .

Сам процесс привязки довольно прост из-за плавного шаблона API и упрощенного ввода: есть только один перегруженный метод bind() , который заботится обо всех преобразованиях ввода — конечно, в соответствии с правилами базы данных.

Первый параметр, передаваемый в bind() , может быть порядковым номером, начинающимся с нуля, который соответствует положению маркера в операторе, или может быть строкой с фактическим маркером.

После того, как мы установили значения для всех параметров, мы вызываем execute() , который возвращает Publisher объектов Result , которые мы снова переносим в Mono для дальнейшей обработки. Мы присоединяем обработчик doFinally() к этому Mono , чтобы убедиться, что мы закроем наше соединение, независимо от того, завершится ли обработка потока нормально или нет.

5.2. Обработка результатов

Следующий шаг в нашем конвейере отвечает за обработку объектов Result и создание потока экземпляров ResponseEntity< Account> .

Поскольку мы знаем, что может быть только один экземпляр с данным id , мы фактически вернем поток Mono . Фактическое преобразование происходит внутри функции, переданной методу map() полученного Result :

.map(result -> result.map((row, meta) -> 
new Account(row.get("id", Long.class),
row.get("iban", String.class),
row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));

Метод map() результата ожидает функцию, которая принимает два параметра. Первый — это объект Row , который мы используем для сбора значений для каждого столбца и заполнения экземпляра Account . Второй, meta , представляет собой объект RowMetadata , который содержит информацию о текущей строке, такую как имена и типы столбцов.

Предыдущий вызов map() в нашем конвейере разрешается в Mono<Producer<Account>> , но нам нужно вернуть Mono<Account> из этого метода. Чтобы исправить это, мы добавляем последний шаг flatMap() , который адаптирует Producer к Mono.

5.3. Пакетные отчеты

R2DBC также поддерживает создание и выполнение пакетов операторов, что позволяет выполнять несколько операторов SQL в одном вызове execute() . В отличие от обычных операторов пакетные операторы не поддерживают привязку и в основном используются для повышения производительности в таких сценариях, как задания ETL.

В нашем примере проекта используется пакет операторов для создания таблицы Account и вставки в нее некоторых тестовых данных:

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Account")
.add("create table Account(" +
"id IDENTITY(1,1)," +
"iban varchar(80) not null," +
"balance DECIMAL(18,2) not null)")
.add("insert into Account(iban,balance)" +
"values('BR430120980198201982',100.00)")
.add("insert into Account(iban,balance)" +
"values('BR430120998729871000',250.00)")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}

Здесь мы используем пакет , возвращенный функцией createBatch() , и добавляем несколько операторов SQL. Затем мы отправляем эти инструкции для выполнения, используя тот же метод execute() , доступный в интерфейсе Statement .

В данном конкретном случае нас не интересуют какие-либо результаты, а только то, что все операторы выполняются нормально. Если бы нам понадобились какие-либо полученные результаты, все, что нам нужно было сделать, это добавить в этот поток следующий шаг для обработки сгенерированных объектов Result .

6. Транзакции

Последняя тема, которую мы рассмотрим в этом руководстве, — это транзакции. Как и следовало ожидать, мы управляем транзакциями так же, как в JDBC, то есть с помощью методов, доступных в объекте Connection .

Как и прежде, основное отличие состоит в том, что теперь все методы, связанные с транзакциями, являются асинхронными , возвращая Publisher , который мы должны добавить в наш поток в соответствующих точках.

Наш пример проекта использует транзакцию в реализации метода createAccount() :

public Mono<Account> createAccount(Account account) {    
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
.bind("$1", account.getIban())
.bind("$2", account.getBalance())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
account.getIban(),
account.getBalance())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}

Здесь мы добавили вызовы, связанные с транзакциями, в двух точках. Во-первых, сразу после получения нового соединения из базы данных мы вызываем метод beginTransactionMethod() . Как только мы узнаем, что транзакция была успешно запущена, мы подготавливаем и выполняем оператор вставки .

На этот раз мы также использовали метод returnGeneratedValues() , чтобы указать базе данных вернуть значение идентификатора, сгенерированное для этой новой учетной записи . R2DBC возвращает эти значения в результате , содержащем одну строку со всеми сгенерированными значениями, которые мы используем для создания экземпляра учетной записи .

И снова нам нужно преобразовать входящий Mono<Publisher<Account>> в Mono<Account> , поэтому для решения этой проблемы мы добавляем flatMap() . Затем мы фиксируем транзакцию на шаге delayUntil() . Нам это нужно, потому что мы хотим убедиться, что возвращенный Аккаунт уже зафиксирован в базе данных.

Наконец, мы присоединяем к этому конвейеру шаг doFinally , который закрывает Connection , когда все события из возвращенного Mono используются.

7. Пример использования DAO

Теперь, когда у нас есть реактивный DAO, давайте воспользуемся им для создания простого приложения Spring WebFlux , чтобы продемонстрировать, как его использовать в типичном приложении. Поскольку этот фреймворк уже поддерживает реактивные конструкции, это становится тривиальной задачей. Например, давайте посмотрим на реализацию метода GET :

@RestController
public class AccountResource {
private final ReactiveAccountDao accountDao;

public AccountResource(ReactiveAccountDao accountDao) {
this.accountDao = accountDao;
}

@GetMapping("/accounts/{id}")
public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
return accountDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
// ... other methods omitted
}

Здесь мы используем возвращенный Mono нашего DAO для создания ResponseEntity с соответствующим кодом состояния. Мы делаем это только потому, что нам нужен код состояния NOT_FOUND (404) `, когда нет учетной записи` с данным идентификатором.

8. Заключение

В этой статье мы рассмотрели основы реактивного доступа к базе данных с помощью R2DBC. Несмотря на то, что этот проект находится в зачаточном состоянии, он быстро развивается, и его выпуск намечен на начало 2020 года.

По сравнению с ADBA, который определенно не будет частью Java 12, R2DBC кажется более перспективным и уже предоставляет драйверы для нескольких популярных баз данных — Oracle здесь заметно отсутствует.

Как обычно, полный исходный код, используемый в этом руководстве, доступен на Github .