1. Введение
В этом руководстве мы впервые рассмотрим RSocket и то, как он обеспечивает взаимодействие клиент-сервер.
2. Что такое RSocket
?
RSocket — это двоичный протокол связи «точка-точка», предназначенный для использования в распределенных приложениях. В этом смысле он представляет собой альтернативу другим протоколам, таким как HTTP.
Полное сравнение RSocket с другими протоколами выходит за рамки этой статьи. Вместо этого мы сосредоточимся на ключевой особенности RSocket: его моделях взаимодействия.
RSocket предоставляет четыре модели взаимодействия. Имея это в виду, мы рассмотрим каждый из них на примере.
3. Зависимости Maven
RSocket нужны только две прямые зависимости для наших примеров:
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>
Зависимости rsocket-core и rsocket-transport-netty доступны на Maven Central.
Важно отметить, что библиотека RSocket часто использует реактивные потоки . В этой статье используются классы Flux
и Mono
, поэтому их базовое понимание будет полезно.
4. Настройка сервера
Во-первых, давайте создадим класс Server :
public class Server {
private final Disposable server;
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.subscribe();
}
public void dispose() {
this.server.dispose();
}
private class RSocketImpl extends AbstractRSocket {}
}
Здесь мы используем RSocketFactory
для настройки и прослушивания сокета TCP. Мы передаем наш пользовательский RSocketImpl
для обработки запросов от клиентов. Мы добавим методы в RSocketImpl
по мере продвижения .
Далее, чтобы запустить сервер, нам просто нужно создать его экземпляр:
Server server = new Server();
Один экземпляр сервера может обрабатывать несколько подключений . В результате всего один экземпляр сервера будет поддерживать все наши примеры.
Когда мы закончим, метод dispose
остановит сервер и освободит порт TCP.
4. Модели взаимодействия
4.1. Ответ на запрос
RSocket предоставляет модель «запрос/ответ» — каждый запрос получает один ответ.
Для этой модели мы создадим простую службу, которая возвращает сообщение клиенту.
Давайте начнем с добавления метода к нашему расширению AbstractRSocket,
RSocketImpl
:
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // reflect the payload back to the sender
} catch (Exception x) {
return Mono.error(x);
}
}
Метод requestResponse
возвращает один результат для каждого запроса , как мы можем видеть по типу ответа Mono<Payload> .
Payload
— это класс, который содержит содержимое сообщения и метаданные . Он используется всеми моделями взаимодействия. Содержимое полезной нагрузки является двоичным, но есть удобные методы, поддерживающие содержимое на основе String .
Далее мы можем создать наш клиентский класс:
public class ReqResClient {
private final RSocket socket;
public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.block();
}
public void dispose() {
this.socket.dispose();
}
}
Клиент использует метод RSocketFactory.connect()
, чтобы инициировать сокетное соединение с сервером. Мы используем метод requestResponse
в сокете для отправки полезной нагрузки на сервер .
Наша полезная нагрузка содержит строку
, переданную клиенту. Когда приходит ответ Mono
<Payload>
, мы можем использовать метод getDataUtf8()
для доступа к содержимому String ответа.
Наконец, мы можем запустить интеграционный тест, чтобы увидеть запрос/ответ в действии. Мы отправим строку
на сервер и убедимся, что возвращается та же самая строка
:
@Test
public void whenSendingAString_thenRevceiveTheSameString() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";
assertEquals(string, client.callBlocking(string));
client.dispose();
}
4.2. Выстрелил-забыл
В модели «выстрелил-забыл» клиент не получит ответа от сервера .
В этом примере клиент будет отправлять смоделированные измерения на сервер с интервалом в 50 мс. Сервер опубликует измерения.
Давайте добавим обработчик «выстрелил-забыл» на наш сервер в классе RSocketImpl
:
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
dataPublisher.publish(payload); // forward the payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}
Этот обработчик очень похож на обработчик запроса/ответа. Однако fireAndForget
возвращает Mono<Void>
вместо Mono<Payload>
.
DataPublisher является экземпляром
org.reactivestreams.Publisher
. Таким образом, он делает полезную нагрузку доступной для подписчиков. Мы воспользуемся этим в примере запроса/потока. ``
Далее мы создадим клиент типа «выстрелил-забыл»:
public class FireNForgetClient {
private final RSocket socket;
private final List<Float> data;
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
/** Send binary velocity (float) every 50ms */
public void sendData() {
data = Collections.unmodifiableList(generateData());
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.flatMap(socket::fireAndForget)
.blockLast();
}
// ...
}
Настройка сокета точно такая же, как и раньше.
Метод sendData()
использует поток Flux
для отправки нескольких сообщений. Для каждого сообщения мы вызываем socket::fireAndForget
.
Нам нужно подписаться на ответ Mono<Void>
для каждого сообщения . Если мы забудем подписаться, то socket::fireAndForget
не будет выполняться.
Оператор flatMap
обеспечивает передачу ответов Void
подписчику, а оператор blockLast
действует как подписчик.
Мы собираемся дождаться следующего раздела, чтобы запустить тест «выстрелил и забыл». В этот момент мы создадим клиент запроса/потока для получения данных, которые были переданы клиентом типа «выстрелил-забыл».
4.3. Запрос/поток
В модели запрос/поток один запрос может получить несколько ответов . Чтобы увидеть это в действии, мы можем использовать пример «выстрелил-забыл». Для этого давайте запросим поток для получения измерений, которые мы отправили в предыдущем разделе.
Как и раньше, давайте начнем с добавления нового слушателя в RSocketImpl
на сервере:
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.from(dataPublisher);
}
Обработчик requestStream
возвращает поток Flux<Payload>
. Как мы помним из предыдущего раздела, обработчик fireAndForget
публиковал входящие данные в dataPublisher.
Теперь мы создадим поток Flux
, используя тот же самый dataPublisher
, что и источник события. При этом данные измерений будут асинхронно передаваться от нашего клиента типа «выстрелил и забыл» к нашему клиенту запроса/потока.
Далее создадим клиент запроса/потока:
public class ReqStreamClient {
private final RSocket socket;
public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}
public void dispose() {
this.socket.dispose();
}
}
Подключаемся к серверу так же, как и наши предыдущие клиенты.
В getDataStream()
мы используем socket.requestStream()
для получения потока Flux<Payload> с сервера . Из этого потока мы извлекаем значения с плавающей
запятой из двоичных данных. Наконец, поток возвращается вызывающей стороне, что позволяет вызывающей стороне подписаться на него и обработать результаты.
Теперь давайте тестировать. Мы проверим путь от «выстрелил-забыл» до запроса/потока.
Мы можем утверждать, что каждое значение получено в том же порядке, в котором оно было отправлено. Затем мы можем утверждать, что получаем то же количество значений, которое было отправлено:
@Test
public void whenSendingStream_thenReceiveTheSameStream() {
FireNForgetClient fnfClient = new FireNForgetClient();
ReqStreamClient streamClient = new ReqStreamClient();
List<Float> data = fnfClient.getData();
List<Float> dataReceived = new ArrayList<>();
Disposable subscription = streamClient.getDataStream()
.index()
.subscribe(
tuple -> {
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
dataReceived.add(tuple.getT2());
},
err -> LOG.error(err.getMessage())
);
fnfClient.sendData();
// ... dispose client & subscription
assertEquals("Wrong data count received", data.size(), dataReceived.size());
}
4.4. Канал
Канальная модель обеспечивает двунаправленную связь . В этой модели потоки сообщений проходят асинхронно в обоих направлениях.
Давайте создадим простую игровую симуляцию, чтобы проверить это. В этой игре каждая сторона канала станет игроком. По ходу игры эти игроки будут отправлять сообщения другой стороне через случайные промежутки времени. Противоположная сторона будет реагировать на сообщения.
Во-первых, мы создадим обработчик на сервере. Как и прежде, мы добавляем в RSocketImpl
:
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
return Flux.from(gameController);
}
Обработчик requestChannel
имеет потоки полезной нагрузки
как для ввода, так и для вывода . Входной параметр Publisher<Payload>
— это поток полезных данных, полученных от клиента. По мере поступления эти полезные данные передаются функции gameController::processPayload
.
В ответ мы возвращаем клиенту другой поток Flux .
Этот поток создается из нашего gameController
, который также является Publisher
.
Вот краткое описание класса GameController
:
public class GameController implements Publisher<Payload> {
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
// send Payload messages to the subscriber at random intervals
}
public void processPayload(Payload payload) {
// react to messages from the other player
}
}
Когда GameController
получает подписчика, он начинает отправлять сообщения этому подписчику.
Далее создадим клиент:
public class ChannelClient {
private final RSocket socket;
private final GameController gameController;
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}
public void dispose() {
this.socket.dispose();
}
}
Как мы видели в наших предыдущих примерах, клиент подключается к серверу так же, как и другие клиенты.
Клиент создает свой собственный экземпляр GameController
.
Мы используем socket.requestChannel()
для отправки нашего потока полезной нагрузки
на сервер . Сервер отвечает собственным потоком полезной нагрузки.
Как полезные данные, полученные с сервера, мы передаем их нашему обработчику gameController::processPayload
.
В нашей игровой симуляции клиент и сервер являются зеркальным отображением друг друга. То есть каждая сторона отправляет поток полезной нагрузки
и получает поток полезной нагрузки
с другого конца .
Потоки работают независимо, без синхронизации.
Наконец, давайте запустим симуляцию в тесте:
@Test
public void whenRunningChannelGame_thenLogTheResults() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}
5. Вывод
В этой вводной статье мы рассмотрели модели взаимодействия, предоставляемые RSocket. Полный исходный код примеров можно найти в нашем репозитории Github .
Обязательно посетите веб- сайт RSocket для более глубокого обсуждения. В частности, документы FAQ и Motivations обеспечивают хорошую основу.