1. Обзор
В этом руководстве мы покажем, как мы можем использовать R2DBC для выполнения операций с базой данных реактивным способом .
Чтобы изучить R2DBC, мы создадим простое приложение Spring WebFlux REST, которое реализует операции CRUD для одного объекта, используя для достижения этой цели только асинхронные операции.
2. Что такое R2DBC
?
Реактивная разработка находится на подъеме: новые фреймворки появляются каждый день, а существующие получают все большее распространение. Однако основной проблемой реактивной разработки является тот факт, что доступ к базе данных в мире Java/JVM остается в основном синхронным . Это прямое следствие того, как был разработан JDBC, и привело к некоторым уродливым хакам для адаптации этих двух принципиально разных подходов.
Чтобы удовлетворить потребность в асинхронном доступе к базе данных в мире Java, появились два стандарта. Первый из них, ADBC (API для асинхронного доступа к базе данных), поддерживается Oracle, но на момент написания этой статьи кажется, что он несколько застопорился и не имеет четкой временной шкалы.
Второй, который мы рассмотрим здесь, — это R2DBC (Reactive Relational Database Connectivity), работа сообщества, возглавляемая командой из Pivotal и других компаний. Этот проект, который все еще находится в стадии бета-тестирования, продемонстрировал большую жизнеспособность и уже предоставляет драйверы для баз данных Postgres, H2 и MSSQL.
3. Настройка проекта
Использование R2DBC в проекте требует добавления зависимостей к основному API и подходящего драйвера. В нашем примере мы будем использовать H2, так что это означает всего две зависимости:
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>0.8.0.M7</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>0.8.0.M7</version>
</dependency>
В Maven Central пока нет артефактов R2DBC, поэтому нам также нужно добавить в наш проект пару репозиториев Spring:
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
4. Заводская настройка соединений
Первое, что нам нужно сделать, чтобы получить доступ к базе данных с помощью R2DBC, — это создать объект ConnectionFactory
, который играет роль, аналогичную DataSource JDBC.
Самый простой способ создать ConnectionFactory
— использовать класс ConnectionFactory .
Этот класс имеет статические методы, которые принимают объект ConnectionFactoryOptions
и возвращают ConnectionFactory.
Поскольку нам понадобится только один экземпляр нашей ConnectionFactory
, давайте создадим @Bean
, который мы сможем позже использовать через инъекцию везде, где нам нужно:
@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(properties.getUser())) {
ob = ob.option(USER, properties.getUser());
}
if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
ob = ob.option(PASSWORD, properties.getPassword());
}
return ConnectionFactories.get(ob.build());
}
Здесь мы берем параметры, полученные от вспомогательного класса, украшенного аннотацией @ConfigurationProperties,
и заполняем наш экземпляр ConnectionFactoryOptions .
Чтобы заполнить его, R2DBC реализует шаблон построителя с одним методом option
, который принимает Option
и значение.
R2DBC определяет ряд хорошо известных опций, таких как USERNAME
и PASSWORD
, которые мы использовали выше. Другой способ установить эти параметры — передать строку соединения методу parse() класса
ConnectionFactoryOptions
.
Вот пример типичного URL-адреса подключения R2DBC:
r2dbc:h2:mem://./testdb
Разобьем эту строку на составляющие:
r2dbc
: идентификатор фиксированной схемы для URL-адресов R2DBC — другая допустимая схема —rd2bcs
, используемая для защищенных SSL-соединений .h2
: идентификатор драйвера, используемый для поиска соответствующей фабрики соединений.mem
: протокол, специфичный для драйвера — в нашем случае это соответствует базе данных в памяти.//./testdb
: Строка, специфичная для драйвера, обычно содержащая хост, базу данных и любые дополнительные параметры.
Когда у нас есть готовый набор опций, мы передаем его в метод статической фабрики get()
для создания нашего bean-компонента ConnectionFactory
.
5. Выполнение операторов
Подобно JDBC, использование R2DBC в основном связано с отправкой операторов SQL в базу данных и обработкой наборов результатов. Однако, поскольку R2DBC является реактивным API, он сильно зависит от типов реактивных потоков, таких как Publisher
и Subscriber
.
Использование этих типов напрямую немного громоздко, поэтому мы будем использовать типы реактора проекта, такие как Mono
и Flux
, которые помогут нам писать более чистый и лаконичный код.
В следующих разделах мы увидим, как реализовать задачи, связанные с базой данных, путем создания реактивного класса DAO для простого класса Account
. Этот класс содержит всего три свойства и имеет соответствующую таблицу в нашей базе данных:
public class Account {
private Long id;
private String iban;
private BigDecimal balance;
// ... getters and setters omitted
}
5.1. Получение соединения
Прежде чем мы сможем отправлять какие-либо операторы в базу данных, нам нужен экземпляр Connection
. Мы уже видели, как создать ConnectionFactory
, поэтому неудивительно, что мы будем использовать его для получения Connection
. Что мы должны помнить, так это то, что теперь вместо обычного Connection
мы получаем Publisher
одного Connection.
Наш ReactiveAccountDao,
который является обычным Spring @Component
, получает свою ConnectionFactory
через внедрение конструктора, поэтому он легко доступен в методах обработчика.
Давайте посмотрим на первые пару строк метода findById()
, чтобы увидеть, как получить и начать использовать Connection
:
public Mono<Account>> findById(Long id) {
return Mono.from(connectionFactory.create())
.flatMap(c ->
// use the connection
)
// ... downstream processing omitted
}
Здесь мы адаптируем Publisher
, возвращенный из нашей ConnectionFactory
, в Mono
, который является исходным источником для нашего потока событий.
5.1. Подготовка и подача заявлений
Теперь, когда у нас есть Connection
, давайте воспользуемся им для создания оператора и привяжем
к нему параметр:
.flatMap( c ->
Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
.bind("$1", id)
.execute())
.doFinally((st) -> close(c))
)
Метод createStatement объекта Connection
принимает
строку SQL-запроса, которая может дополнительно содержать заполнители привязки, называемые в спецификации «маркерами» . ``
Пара замечаний: во-первых, createStatement
— это синхронная операция , которая позволяет нам использовать плавный стиль для привязки значений к возвращаемому оператору;
во-вторых, и это очень важно, синтаксис заполнителя/маркера зависит от поставщика!
В этом примере мы используем специальный синтаксис H2, который использует $n
для обозначения параметров. Другие поставщики могут использовать другой синтаксис, например :param
, @Pn
или какое-то другое соглашение. Это важный аспект, на который мы должны обратить внимание при переносе устаревшего кода на этот новый API .
Сам процесс привязки довольно прост из-за плавного шаблона API и упрощенного ввода: есть только один перегруженный метод bind()
, который заботится обо всех преобразованиях ввода — конечно, в соответствии с правилами базы данных.
Первый параметр, передаваемый в bind()
, может быть порядковым номером, начинающимся с нуля, который соответствует положению маркера в операторе, или может быть строкой с фактическим маркером.
После того, как мы установили значения для всех параметров, мы вызываем execute()
, который возвращает Publisher
объектов Result
, которые мы снова переносим в Mono
для дальнейшей обработки. Мы присоединяем обработчик doFinally()
к этому Mono
, чтобы убедиться, что мы закроем наше соединение, независимо от того, завершится ли обработка потока нормально или нет.
5.2. Обработка результатов
Следующий шаг в нашем конвейере отвечает за обработку объектов Result
и создание потока экземпляров ResponseEntity<
Account>
.
Поскольку мы знаем, что может быть только один экземпляр с данным id
, мы фактически вернем поток Mono .
Фактическое преобразование происходит внутри функции, переданной методу map() полученного
Result
:
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
row.get("iban", String.class),
row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));
Метод map()
результата ожидает функцию, которая принимает два параметра. Первый — это объект Row
, который мы используем для сбора значений для каждого столбца и заполнения экземпляра Account .
Второй, meta
, представляет собой объект RowMetadata
, который содержит информацию о текущей строке, такую как имена и типы столбцов.
Предыдущий вызов map()
в нашем конвейере разрешается в Mono<Producer<Account>>
, но нам нужно вернуть Mono<Account>
из этого метода. Чтобы исправить это, мы добавляем последний шаг flatMap()
, который адаптирует Producer
к Mono.
5.3. Пакетные отчеты
R2DBC также поддерживает создание и выполнение пакетов операторов, что позволяет выполнять несколько операторов SQL в одном вызове execute() .
В отличие от обычных операторов пакетные операторы не поддерживают привязку и в основном используются для повышения производительности в таких сценариях, как задания ETL.
В нашем примере проекта используется пакет операторов для создания таблицы Account
и вставки в нее некоторых тестовых данных:
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Account")
.add("create table Account(" +
"id IDENTITY(1,1)," +
"iban varchar(80) not null," +
"balance DECIMAL(18,2) not null)")
.add("insert into Account(iban,balance)" +
"values('BR430120980198201982',100.00)")
.add("insert into Account(iban,balance)" +
"values('BR430120998729871000',250.00)")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}
Здесь мы используем пакет
, возвращенный функцией createBatch()
, и добавляем несколько операторов SQL. Затем мы отправляем эти инструкции для выполнения, используя тот же метод execute()
, доступный в интерфейсе Statement .
В данном конкретном случае нас не интересуют какие-либо результаты, а только то, что все операторы выполняются нормально. Если бы нам понадобились какие-либо полученные результаты, все, что нам нужно было сделать, это добавить в этот поток следующий шаг для обработки сгенерированных объектов Result .
6. Транзакции
Последняя тема, которую мы рассмотрим в этом руководстве, — это транзакции. Как и следовало ожидать, мы управляем транзакциями так же, как в JDBC, то есть с помощью методов, доступных в объекте Connection .
Как и прежде, основное отличие состоит в том, что теперь все методы, связанные с транзакциями, являются асинхронными , возвращая Publisher
, который мы должны добавить в наш поток в соответствующих точках.
Наш пример проекта использует транзакцию в реализации метода createAccount()
:
public Mono<Account> createAccount(Account account) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
.bind("$1", account.getIban())
.bind("$2", account.getBalance())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
account.getIban(),
account.getBalance())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}
Здесь мы добавили вызовы, связанные с транзакциями, в двух точках. Во-первых, сразу после получения нового соединения из базы данных мы вызываем метод beginTransactionMethod()
. Как только мы узнаем, что транзакция была успешно запущена, мы подготавливаем и выполняем оператор вставки .
На этот раз мы также использовали метод returnGeneratedValues()
, чтобы указать базе данных вернуть значение идентификатора, сгенерированное для этой новой учетной записи
. R2DBC возвращает эти значения в результате
, содержащем одну строку со всеми сгенерированными значениями, которые мы используем для создания экземпляра учетной записи .
И снова нам нужно преобразовать входящий Mono<Publisher<Account>>
в Mono<Account>
, поэтому для решения этой проблемы мы добавляем flatMap()
.
Затем мы фиксируем транзакцию на шаге delayUntil()
. Нам это нужно, потому что мы хотим убедиться, что возвращенный Аккаунт
уже зафиксирован в базе данных.
Наконец, мы присоединяем к этому конвейеру шаг doFinally
, который закрывает Connection
, когда все события из возвращенного Mono
используются.
7. Пример использования DAO
Теперь, когда у нас есть реактивный DAO, давайте воспользуемся им для создания простого приложения Spring WebFlux , чтобы продемонстрировать, как его использовать в типичном приложении. Поскольку этот фреймворк уже поддерживает реактивные конструкции, это становится тривиальной задачей. Например, давайте посмотрим на реализацию метода GET
:
@RestController
public class AccountResource {
private final ReactiveAccountDao accountDao;
public AccountResource(ReactiveAccountDao accountDao) {
this.accountDao = accountDao;
}
@GetMapping("/accounts/{id}")
public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
return accountDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
// ... other methods omitted
}
Здесь мы используем возвращенный Mono
нашего DAO для создания ResponseEntity
с соответствующим кодом состояния. Мы делаем это только потому, что нам нужен код состояния NOT_FOUND
(404) `, когда нет
учетной записи` с данным идентификатором.
8. Заключение
В этой статье мы рассмотрели основы реактивного доступа к базе данных с помощью R2DBC. Несмотря на то, что этот проект находится в зачаточном состоянии, он быстро развивается, и его выпуск намечен на начало 2020 года.
По сравнению с ADBA, который определенно не будет частью Java 12, R2DBC кажется более перспективным и уже предоставляет драйверы для нескольких популярных баз данных — Oracle здесь заметно отсутствует.
Как обычно, полный исходный код, используемый в этом руководстве, доступен на Github .