Перейти к основному содержимому

Введение в RSocket

· 8 мин. чтения

1. Введение

В этом руководстве мы впервые рассмотрим RSocket и то, как он обеспечивает взаимодействие клиент-сервер.

2. Что такое RSocket ?

RSocket — это двоичный протокол связи «точка-точка», предназначенный для использования в распределенных приложениях. В этом смысле он представляет собой альтернативу другим протоколам, таким как HTTP.

Полное сравнение RSocket с другими протоколами выходит за рамки этой статьи. Вместо этого мы сосредоточимся на ключевой особенности RSocket: его моделях взаимодействия.

RSocket предоставляет четыре модели взаимодействия. Имея это в виду, мы рассмотрим каждый из них на примере.

3. Зависимости Maven

RSocket нужны только две прямые зависимости для наших примеров:

<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>

Зависимости rsocket-core и rsocket-transport-netty доступны на Maven Central.

Важно отметить, что библиотека RSocket часто использует реактивные потоки . В этой статье используются классы Flux и Mono , поэтому их базовое понимание будет полезно.

4. Настройка сервера

Во-первых, давайте создадим класс Server :

public class Server {
private final Disposable server;

public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.subscribe();
}

public void dispose() {
this.server.dispose();
}

private class RSocketImpl extends AbstractRSocket {}
}

Здесь мы используем RSocketFactory для настройки и прослушивания сокета TCP. Мы передаем наш пользовательский RSocketImpl для обработки запросов от клиентов. Мы добавим методы в RSocketImpl по мере продвижения .

Далее, чтобы запустить сервер, нам просто нужно создать его экземпляр:

Server server = new Server();

Один экземпляр сервера может обрабатывать несколько подключений . В результате всего один экземпляр сервера будет поддерживать все наши примеры.

Когда мы закончим, метод dispose остановит сервер и освободит порт TCP.

4. Модели взаимодействия

4.1. Ответ на запрос

RSocket предоставляет модель «запрос/ответ» — каждый запрос получает один ответ.

Для этой модели мы создадим простую службу, которая возвращает сообщение клиенту.

Давайте начнем с добавления метода к нашему расширению AbstractRSocket, RSocketImpl :

@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // reflect the payload back to the sender
} catch (Exception x) {
return Mono.error(x);
}
}

Метод requestResponse возвращает один результат для каждого запроса , как мы можем видеть по типу ответа Mono<Payload> .

Payload — это класс, который содержит содержимое сообщения и метаданные . Он используется всеми моделями взаимодействия. Содержимое полезной нагрузки является двоичным, но есть удобные методы, поддерживающие содержимое на основе String .

Далее мы можем создать наш клиентский класс:

public class ReqResClient {

private final RSocket socket;

public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}

public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.block();
}

public void dispose() {
this.socket.dispose();
}
}

Клиент использует метод RSocketFactory.connect() , чтобы инициировать сокетное соединение с сервером. Мы используем метод requestResponse в сокете для отправки полезной нагрузки на сервер .

Наша полезная нагрузка содержит строку , переданную клиенту. Когда приходит ответ Mono <Payload> , мы можем использовать метод getDataUtf8() для доступа к содержимому String ответа.

Наконец, мы можем запустить интеграционный тест, чтобы увидеть запрос/ответ в действии. Мы отправим строку на сервер и убедимся, что возвращается та же самая строка :

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";

assertEquals(string, client.callBlocking(string));

client.dispose();
}

4.2. Выстрелил-забыл

В модели «выстрелил-забыл» клиент не получит ответа от сервера .

В этом примере клиент будет отправлять смоделированные измерения на сервер с интервалом в 50 мс. Сервер опубликует измерения.

Давайте добавим обработчик «выстрелил-забыл» на наш сервер в классе RSocketImpl :

@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
dataPublisher.publish(payload); // forward the payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}

Этот обработчик очень похож на обработчик запроса/ответа. Однако fireAndForget возвращает Mono<Void> вместо Mono<Payload> .

DataPublisher является экземпляром org.reactivestreams.Publisher . Таким образом, он делает полезную нагрузку доступной для подписчиков. Мы воспользуемся этим в примере запроса/потока. ``

Далее мы создадим клиент типа «выстрелил-забыл»:

public class FireNForgetClient {
private final RSocket socket;
private final List<Float> data;

public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}

/** Send binary velocity (float) every 50ms */
public void sendData() {
data = Collections.unmodifiableList(generateData());
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.flatMap(socket::fireAndForget)
.blockLast();
}

// ...
}

Настройка сокета точно такая же, как и раньше.

Метод sendData() использует поток Flux для отправки нескольких сообщений. Для каждого сообщения мы вызываем socket::fireAndForget .

Нам нужно подписаться на ответ Mono<Void> для каждого сообщения . Если мы забудем подписаться, то socket::fireAndForget не будет выполняться.

Оператор flatMap обеспечивает передачу ответов Void подписчику, а оператор blockLast действует как подписчик.

Мы собираемся дождаться следующего раздела, чтобы запустить тест «выстрелил и забыл». В этот момент мы создадим клиент запроса/потока для получения данных, которые были переданы клиентом типа «выстрелил-забыл».

4.3. Запрос/поток

В модели запрос/поток один запрос может получить несколько ответов . Чтобы увидеть это в действии, мы можем использовать пример «выстрелил-забыл». Для этого давайте запросим поток для получения измерений, которые мы отправили в предыдущем разделе.

Как и раньше, давайте начнем с добавления нового слушателя в RSocketImpl на сервере:

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.from(dataPublisher);
}

Обработчик requestStream возвращает поток Flux<Payload> . Как мы помним из предыдущего раздела, обработчик fireAndForget публиковал входящие данные в dataPublisher. Теперь мы создадим поток Flux , используя тот же самый dataPublisher , что и источник события. При этом данные измерений будут асинхронно передаваться от нашего клиента типа «выстрелил и забыл» к нашему клиенту запроса/потока.

Далее создадим клиент запроса/потока:

public class ReqStreamClient {

private final RSocket socket;

public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}

public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}

public void dispose() {
this.socket.dispose();
}
}

Подключаемся к серверу так же, как и наши предыдущие клиенты.

В getDataStream() мы используем socket.requestStream() для получения потока Flux<Payload> с сервера . Из этого потока мы извлекаем значения с плавающей запятой из двоичных данных. Наконец, поток возвращается вызывающей стороне, что позволяет вызывающей стороне подписаться на него и обработать результаты.

Теперь давайте тестировать. Мы проверим путь от «выстрелил-забыл» до запроса/потока.

Мы можем утверждать, что каждое значение получено в том же порядке, в котором оно было отправлено. Затем мы можем утверждать, что получаем то же количество значений, которое было отправлено:

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
FireNForgetClient fnfClient = new FireNForgetClient();
ReqStreamClient streamClient = new ReqStreamClient();

List<Float> data = fnfClient.getData();
List<Float> dataReceived = new ArrayList<>();

Disposable subscription = streamClient.getDataStream()
.index()
.subscribe(
tuple -> {
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
dataReceived.add(tuple.getT2());
},
err -> LOG.error(err.getMessage())
);

fnfClient.sendData();

// ... dispose client & subscription

assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

4.4. Канал

Канальная модель обеспечивает двунаправленную связь . В этой модели потоки сообщений проходят асинхронно в обоих направлениях.

Давайте создадим простую игровую симуляцию, чтобы проверить это. В этой игре каждая сторона канала станет игроком. По ходу игры эти игроки будут отправлять сообщения другой стороне через случайные промежутки времени. Противоположная сторона будет реагировать на сообщения.

Во-первых, мы создадим обработчик на сервере. Как и прежде, мы добавляем в RSocketImpl :

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
return Flux.from(gameController);
}

Обработчик requestChannel имеет потоки полезной нагрузки как для ввода, так и для вывода . Входной параметр Publisher<Payload> — это поток полезных данных, полученных от клиента. По мере поступления эти полезные данные передаются функции gameController::processPayload .

В ответ мы возвращаем клиенту другой поток Flux . Этот поток создается из нашего gameController , который также является Publisher .

Вот краткое описание класса GameController :

public class GameController implements Publisher<Payload> {

@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
// send Payload messages to the subscriber at random intervals
}

public void processPayload(Payload payload) {
// react to messages from the other player
}
}

Когда GameController получает подписчика, он начинает отправлять сообщения этому подписчику.

Далее создадим клиент:

public class ChannelClient {

private final RSocket socket;
private final GameController gameController;

public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();

this.gameController = new GameController("Client Player");
}

public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}

public void dispose() {
this.socket.dispose();
}
}

Как мы видели в наших предыдущих примерах, клиент подключается к серверу так же, как и другие клиенты.

Клиент создает свой собственный экземпляр GameController .

Мы используем socket.requestChannel() для отправки нашего потока полезной нагрузки на сервер . Сервер отвечает собственным потоком полезной нагрузки.

Как полезные данные, полученные с сервера, мы передаем их нашему обработчику gameController::processPayload .

В нашей игровой симуляции клиент и сервер являются зеркальным отображением друг друга. То есть каждая сторона отправляет поток полезной нагрузки и получает поток полезной нагрузки с другого конца .

Потоки работают независимо, без синхронизации.

Наконец, давайте запустим симуляцию в тесте:

@Test
public void whenRunningChannelGame_thenLogTheResults() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}

5. Вывод

В этой вводной статье мы рассмотрели модели взаимодействия, предоставляемые RSocket. Полный исходный код примеров можно найти в нашем репозитории Github .

Обязательно посетите веб- сайт RSocket для более глубокого обсуждения. В частности, документы FAQ и Motivations обеспечивают хорошую основу.