Перейти к основному содержимому

RSocket с использованием Spring Boot

· 6 мин. чтения

1. Обзор

RSocket — это прикладной протокол, обеспечивающий семантику Reactive Streams — он функционирует, например, как альтернатива HTTP.

В этом руководстве мы рассмотрим RSocket с помощью Spring Boot и, в частности, то, как это помогает абстрагироваться от низкоуровневого API RSocket.

2. Зависимости

Начнем с добавления зависимости spring-boot-starter-rsocket :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

Это транзитивно подтянет зависимости, связанные с RSocket, такие как rsocket-core и rsocket-transport-netty .

3. Образец заявления

Теперь мы продолжим работу с нашим примером приложения. Чтобы выделить модели взаимодействия, предоставляемые RSocket, мы собираемся создать приложение трейдера. Наше приложение трейдера будет состоять из клиента и сервера.

3.1. Настройка сервера

Во-первых, давайте настроим сервер, который будет приложением Spring Boot, загружающим сервер RSocket.

Поскольку у нас есть зависимость spring-boot-starter-rsocket , Spring Boot автоматически настраивает для нас сервер RSocket. Как обычно с Spring Boot, мы можем изменить значения конфигурации по умолчанию для сервера RSocket в зависимости от свойств.

Например, давайте изменим порт нашего сервера RSocket, добавив следующую строку в наш файл application.properties :

spring.rsocket.server.port=7000

Мы также можем изменить другие свойства , чтобы дополнительно изменить наш сервер в соответствии с нашими потребностями.

3.2. Настройка клиента

Далее давайте настроим клиент, который также будет приложением Spring Boot.

Хотя Spring Boot автоматически настраивает большинство компонентов, связанных с RSocket, мы также должны определить некоторые bean-компоненты для завершения настройки:

@Configuration
public class ClientConfiguration {

@Bean
public RSocketRequester getRSocketRequester(){
RSocketRequester.Builder builder = RSocketRequester.builder();

return builder
.rsocketConnector(
rSocketConnector ->
rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}
}

Здесь мы создаем клиент RSocket и настраиваем его для использования транспорта TCP на порту 7000. Обратите внимание, что это порт сервера, который мы настроили ранее.

После определения этой конфигурации bean-компонента у нас есть базовая структура.

Далее мы рассмотрим различные модели взаимодействия и посмотрим, как Spring Boot помогает нам в этом.

4. Запрос/ответ с RSocket и Spring Boot

Начнем с запроса/ответа. Это, вероятно, наиболее распространенная и знакомая модель взаимодействия, поскольку HTTP также использует этот тип связи.

В этой модели взаимодействия клиент инициирует связь и отправляет запрос. После этого сервер выполняет операцию и возвращает ответ клиенту — таким образом, связь завершается.

В нашем приложении трейдера клиент запросит текущие рыночные данные по данной акции. Взамен сервер передаст запрошенные данные.

4.1. Сервер

На стороне сервера мы должны сначала создать контроллер для хранения наших методов обработчика. Но вместо аннотаций @RequestMapping или @GetMapping , как в Spring MVC, мы будем использовать аннотацию @MessageMapping :

@Controller
public class MarketDataRSocketController {

private final MarketDataRepository marketDataRepository;

public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}

@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
}
}

Итак, давайте исследуем наш контроллер.

Мы используем аннотацию @ Controller для определения обработчика, который должен обрабатывать входящие запросы RSocket. Кроме того, аннотация @MessageMapping позволяет нам определить, какой маршрут нас интересует и как реагировать на запрос.

В этом случае сервер прослушивает маршрут currentMarketData , который возвращает клиенту единственный результат в виде Mono<MarketData> .

4.2. Клиент

Затем наш клиент RSocket должен запросить текущую цену акции и получить один ответ.

Чтобы инициировать запрос, мы должны использовать класс RSocketRequester :

@RestController
public class MarketDataRestController {

private final RSocketRequester rSocketRequester;

public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}

@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester
.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}

Обратите внимание, что в нашем случае клиент RSocket также является контроллером REST, из которого мы вызываем наш сервер RSocket. Итак, мы используем @RestController и @GetMapping для определения конечной точки запроса/ответа.

В методе конечной точки мы используем RSocketRequester и указываем маршрут. Фактически это маршрут, который ожидает сервер RSocket. Затем мы передаем данные запроса. И, наконец, когда мы вызываем метод retrieveMono() , Spring Boot инициирует взаимодействие запрос/ответ .

5. Выстрелил и забыл с RSocket и Spring Boot

Далее мы рассмотрим модель взаимодействия «выстрелил и забыл». Как следует из названия, клиент отправляет запрос на сервер, но не ожидает ответа.

В нашем приложении трейдера некоторые клиенты будут служить источником данных и передавать рыночные данные на сервер.

5.1. Сервер

Давайте создадим еще одну конечную точку в нашем серверном приложении:

@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}

Опять же, мы определяем новый @MessageMapping со значением маршрута collectMarketData . Кроме того, Spring Boot автоматически преобразует входящую полезную нагрузку в экземпляр MarketData .

Однако большая разница здесь в том, что мы возвращаем Mono<Void> , так как клиенту не нужен наш ответ.

5.2. Клиент

Давайте посмотрим, как мы можем инициировать наш запрос «выстрелил-забыл».

Мы создадим еще одну конечную точку REST:

@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester
.route("collectMarketData")
.data(getMarketData())
.send();
}

Здесь мы указываем наш маршрут, и наша полезная нагрузка будет экземпляром MarketData . Поскольку мы используем метод send() для инициации запроса вместо retrieveMono() , модель взаимодействия становится «выстрелил-забыл» .

6. Запросить поток с помощью RSocket и Spring Boot

Потоковая передача запросов — это более сложная модель взаимодействия, при которой клиент отправляет запрос, но в течение времени получает несколько ответов от сервера.

Чтобы смоделировать эту модель взаимодействия, клиент запросит все рыночные данные по данной акции.

6.1. Сервер

Начнем с нашего сервера. Мы добавим еще один метод сопоставления сообщений:

@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
}

Как мы видим, этот метод-обработчик очень похож на другие. Другое дело , что мы возвращаем Flux<MarketData> вместо Mono<MarketData> . В конце концов, наш сервер RSocket отправит клиенту несколько ответов.

6.2. Клиент

На стороне клиента мы должны создать конечную точку для инициации нашего запроса/потока:

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester
.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}

Давайте исследуем наш запрос RSocket.

Во-первых, мы определяем маршрут и запрашиваем полезную нагрузку. Затем мы определяем ожидаемый ответ с помощью вызова метода retrieveFlux() . Это та часть, которая определяет модель взаимодействия.

Также обратите внимание, что, поскольку наш клиент также является сервером REST, он определяет тип носителя ответа как MediaType.TEXT_EVENT_STREAM_VALUE.

7. Обработка исключений

Теперь давайте посмотрим, как мы можем декларативно обрабатывать исключения в нашем серверном приложении.

При выполнении запроса/ответа мы можем просто использовать аннотацию @MessageExceptionHandler :

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}

Здесь мы аннотировали наш метод обработчика исключений с помощью @MessageExceptionHandler . В результате он будет обрабатывать все типы исключений, поскольку класс Exception является надклассом всех остальных.

Мы можем быть более конкретными и создавать разные методы обработки исключений для разных типов исключений.

Это, конечно, для модели запрос/ответ, поэтому мы возвращаем Mono<MarketData>. Мы хотим, чтобы наш возвращаемый тип здесь соответствовал возвращаемому типу нашей модели взаимодействия.

8. Резюме

В этом руководстве мы рассмотрели поддержку RSocket в Spring Boot и подробно описали различные модели взаимодействия, предоставляемые RSocket.

Как всегда, вы можете ознакомиться со всеми примерами кода на GitHub .