Перейти к основному содержимому

Spring Data с реактивной Cassandra

· 4 мин. чтения

1. Введение

В этом руководстве мы узнаем, как использовать функции реактивного доступа к данным Spring Data Cassandra.

В частности, это третья статья из серии статей Spring Data Cassandra. В этом мы предоставим базу данных Cassandra с помощью REST API.

Подробнее о Spring Data Cassandra мы можем прочитать в первой и второй статьях серии.

2. Зависимости Maven

На самом деле Spring Data Cassandra поддерживает реактивные типы Project Reactor и RxJava. Для демонстрации в этом руководстве мы будем использовать реактивные типы Flux и Mono реактора Project .

Для начала давайте добавим зависимости, необходимые для нашего руководства:

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

Последнюю версию spring-data-cassandra можно найти здесь .

Теперь мы собираемся предоставить операции SELECT из базы данных через REST API. Итак, давайте также добавим зависимость для RestController :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

3. Внедрение нашего приложения

Поскольку мы будем сохранять данные, давайте сначала определим наш объект сущности :

@Table
public class Employee {
@PrimaryKey
private int id;
private String name;
private String address;
private String email;
private int age;
}

Затем пришло время создать EmployeeRepository , который расширяется от ReactiveCassandraRepository. Важно отметить, что этот интерфейс включает поддержку реактивных типов :

public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
@AllowFiltering
Flux<Employee> findByAgeGreaterThan(int age);
}

3.1. Rest Controller для операций CRUD

В целях иллюстрации мы представим некоторые основные операции SELECT с использованием простого Rest Controller:

@RestController
@RequestMapping("employee")
public class EmployeeController {

@Autowired
EmployeeService employeeService;

@PostConstruct
public void saveEmployees() {
List<Employee> employees = new ArrayList<>();
employees.add(new Employee(123, "John Doe", "Delaware", "jdoe@xyz.com", 31));
employees.add(new Employee(324, "Adam Smith", "North Carolina", "asmith@xyz.com", 43));
employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24));
employees.add(new Employee(643, "Mike Lauren", "New York", "mlauren@xyz.com", 41));
employeeService.initializeEmployees(employees);
}

@GetMapping("/list")
public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeService.getAllEmployees();
return employees;
}

@GetMapping("/{id}")
public Mono<Employee> getEmployeeById(@PathVariable int id) {
return employeeService.getEmployeeById(id);
}

@GetMapping("/filterByAge/{age}")
public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
return employeeService.getEmployeesFilterByAge(age);
}
}

Наконец, давайте добавим простой EmployeeService :

@Service
public class EmployeeService {

@Autowired
EmployeeRepository employeeRepository;

public void initializeEmployees(List<Employee> employees) {
Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
savedEmployees.subscribe();
}

public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeRepository.findAll();
return employees;
}

public Flux<Employee> getEmployeesFilterByAge(int age) {
return employeeRepository.findByAgeGreaterThan(age);
}

public Mono<Employee> getEmployeeById(int id) {
return employeeRepository.findById(id);
}
}

3.2. Конфигурация базы данных

Затем давайте укажем пространство ключей и порт для подключения к Cassandra в application.properties :

spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1

Примечание. datacenter1 — это имя центра обработки данных по умолчанию.

4. Тестирование конечных точек

Наконец, пришло время протестировать наши конечные точки API.

4.1. Ручное тестирование

Для начала возьмем записи о сотрудниках из базы данных:

curl localhost:8080/employee/list

В итоге получаем всех сотрудников:

[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "asmith@xyz.com",
"age": 43
},
{
"id": 123,
"name": "John Doe",
"address": "Delaware",
"email": "jdoe@xyz.com",
"age": 31
},
{
"id": 355,
"name": "Kevin Dunner",
"address": "Virginia",
"email": "kdunner@xyz.com",
"age": 24
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mlauren@xyz.com",
"age": 41
}
]

Далее попробуем найти конкретного сотрудника по его id:

curl localhost:8080/employee/643

В итоге получаем мистера Майка Лорена обратно:

{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mlauren@xyz.com",
"age": 41
}

Наконец, давайте посмотрим, работает ли наш возрастной фильтр:

curl localhost:8080/employee/filterByAge/35

И, как и ожидалось, получаем всех сотрудников, чей возраст больше 35 лет:

[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "asmith@xyz.com",
"age": 43
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mlauren@xyz.com",
"age": 41
}
]

4.2. Интеграционное тестирование

Кроме того, давайте проверим ту же функциональность, написав тестовый пример:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {

@Autowired
EmployeeRepository repository;

@Before
public void setUp() {
Flux<Employee> deleteAndInsert = repository.deleteAll()
.thenMany(repository.saveAll(Flux.just(
new Employee(111, "John Doe", "Delaware", "jdoe@xyz.com", 31),
new Employee(222, "Adam Smith", "North Carolina", "asmith@xyz.com", 43),
new Employee(333, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24),
new Employee(444, "Mike Lauren", "New York", "mlauren@xyz.com", 41))));

StepVerifier
.create(deleteAndInsert)
.expectNextCount(4)
.verifyComplete();
}

@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
Mono<Long> saveAndCount = repository.count()
.doOnNext(System.out::println)
.thenMany(repository
.saveAll(Flux.just(
new Employee(325, "Kim Jones", "Florida", "kjones@xyz.com", 42),
new Employee(654, "Tom Moody", "New Hampshire", "tmoody@xyz.com", 44))))
.last()
.flatMap(v -> repository.count())
.doOnNext(System.out::println);

StepVerifier
.create(saveAndCount)
.expectNext(6L)
.verifyComplete();
}

@Test
public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
StepVerifier
.create(repository.findByAgeGreaterThan(35))
.expectNextCount(2)
.verifyComplete();
}
}

5. Вывод

Таким образом, мы узнали, как использовать реактивные типы с помощью Spring Data Cassandra для создания неблокирующего приложения.

Как всегда, ознакомьтесь с исходным кодом этого руководства на GitHub .