Перейти к основному содержимому

API реактивных потоков с Ratpack

· 12 мин. чтения

1. Введение

Ratpack — это фреймворк, созданный поверх движка Netty , который позволяет нам быстро создавать HTTP-приложения. Мы уже рассмотрели его основное использование в предыдущих статьях . На этот раз мы покажем, как использовать его потоковый API для реализации реактивных приложений .

2. Краткий обзор реактивных потоков

Прежде чем перейти к фактической реализации, давайте сначала кратко рассмотрим, что представляет собой реактивное приложение. По мнению авторов оригинала , такие приложения должны обладать следующими свойствами:

  • Отзывчивый
  • Устойчивый
  • Эластичный
  • Сообщение

Итак, как Reactive Streams помогает нам достичь любого из этих свойств? Ну, в этом контексте управление сообщениями не обязательно подразумевает использование промежуточного программного обеспечения для обмена сообщениями. Вместо этого для решения этой проблемы на самом деле требуется асинхронная обработка запросов и поддержка неблокирующего обратного давления .

Реактивная поддержка Ratpack использует стандарт API Reactive Streams для JVM в качестве основы для своей реализации. Таким образом, он обеспечивает взаимодействие с другими совместимыми платформами, такими как Project Reactor и RxJava .

3. Использование класса потоков Ratpacks

Класс Streams Ratpack предоставляет несколько служебных методов для создания экземпляров Publisher , которые затем можно использовать для создания конвейеров обработки данных.

Хорошей отправной точкой является метод publish() , который мы можем использовать для создания Publisher из любого Iterable :

Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();

Здесь LoggingSubscriber — это тестовая реализация интерфейса подписчика , которая просто регистрирует каждый объект, созданный издателем. Он также включает вспомогательный метод block() , который, как следует из названия, блокирует вызывающую программу до тех пор, пока издатель не выпустит все свои объекты или не выдаст ошибку.

Запустив тестовый пример, мы увидим ожидаемую последовательность событий:

onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908

Другой полезный метод — yield() . Он имеет единственный параметр Function , который получает объект YieldRequest и возвращает следующий объект для генерации:

@Test
public void whenYield_thenSuccess() {

Publisher<String> pub = Streams.yield((t) -> {
return t.getRequestNum() < 5 ? "hello" : null;
});

LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}

Параметр YieldRequest позволяет нам реализовать логику, основанную на количестве сгенерированных объектов, используя его метод getRequestNum() . В нашем примере мы используем эту информацию для определения конечного условия, которое мы сигнализируем, возвращая нулевое значение.

Теперь давайте посмотрим, как создать Publisher для периодических событий:

@Test
public void whenPeriodic_thenSuccess() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
return t < 5 ? String.format("hello %d",t): null;
});

LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}

Возвращенный издатель использует ScheduledExecutorService для периодического вызова функции производителя, пока она не вернет нулевое значение. Функция-производитель получает целочисленное значение, соответствующее количеству уже выпущенных объектов, которое мы используем для завершения потока.

4. Использование TransformablePublisher

Присмотревшись к методам Streams , мы увидим, что они обычно возвращают TransformablePublisher . Этот интерфейс расширяет возможности Publisher несколькими служебными методами, которые, подобно тому, что мы находим в Flux и Mono в Project Reactor , упрощают создание сложных конвейеров обработки из отдельных шагов .

В качестве примера воспользуемся методом map для преобразования последовательности целых чисел в строки:

@Test
public void whenMap_thenSuccess() throws Exception {
TransformablePublisher<String> pub = Streams.yield( t -> {
return t.getRequestNum() < 5 ? t.getRequestNum() : null;
})
.map(v -> String.format("item %d", v));

ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
assertTrue("should succeed", result.isSuccess());
assertEquals("should have 5 items",5,result.getValue().size());
}

Здесь фактическое выполнение происходит внутри пула потоков, управляемого тестовым служебным классом ExecHarness . Так как yieldSingle() ожидает Promise , мы используем toList() для адаптации нашего издателя. Этот метод собирает все результаты, полученные подписчиком, и сохраняет их в списке .

Как указано в документации, мы должны соблюдать осторожность при использовании этого метода. Применение его к неограниченному издателю может быстро привести к нехватке памяти JVM! Чтобы избежать этой ситуации, мы должны ограничивать его использование в основном модульными тестами .

Помимо map() , в TransformablePublisher есть несколько полезных операторов:

  • filter() : фильтровать восходящие объекты на основе предиката .
  • take() : испускает только первые n объектов из вышестоящего издателя .
  • wiretap() : добавляет точку наблюдения, где мы можем проверять данные и события, когда они проходят через конвейер.
  • reduce() : уменьшить вышестоящие объекты до одного значения
  • transform() : вводит в поток обычного издателя .

5. Использование buffer() с несовместимыми издателями

В некоторых сценариях мы должны иметь дело с издателем , который отправляет своим подписчикам больше элементов, чем запрошено . Чтобы справиться с такими сценариями, потоки Ratpack предлагают метод buffer() , который сохраняет эти дополнительные элементы в памяти до тех пор, пока они не будут использованы подписчиками.

Чтобы проиллюстрировать, как это работает, давайте создадим простой несовместимый издатель , который игнорировал количество запрошенных элементов. Вместо этого он всегда будет производить как минимум на 5 элементов больше, чем запрошено:

private class NonCompliantPublisher implements Publisher<Integer> {

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
}

private class NonCompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel = 0;

public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}

@Override
public void request(long n) {
log.info("request: n={}", n);
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (n + 5) ; i ++ ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}

@Override
public void cancel() {
}
}
}

Во-первых, давайте протестируем этого издателя, используя наш LoggingSubscriber. Мы будем использовать оператор take() , чтобы получить только первый элемент.

@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction(""))
.take(1);

LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}

Запустив этот тест, мы видим, что, несмотря на получение запроса cancel() , наш несоответствующий издатель продолжает создавать новые элементы:

RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145

Теперь давайте добавим в этот поток шаг buffer() . Мы добавим два шага прослушивания для регистрации событий перед ним, чтобы его эффект стал более очевидным:

@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction("before buffer"))
.buffer()
.wiretap(new LoggingAction("after buffer"))
.take(1);

LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}

На этот раз запуск этого кода приводит к другой последовательности журнала:

LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214

Сообщения «до буфера» показывают, что наш несовместимый издатель смог отправить все значения после первого вызова request . Тем не менее, нисходящие значения по-прежнему отправлялись одно за другим , соблюдая количество, запрошенное LoggingSubscriber .

6. Использование batch() с медленными подписчиками

Другой сценарий, который может снизить пропускную способность приложения, — это когда нижестоящие подписчики запрашивают данные небольшими объемами. Наш LoggingSubscriber — хороший пример: он запрашивает только один элемент за раз.

В реальных приложениях это может привести к большому количеству переключений контекста, что снизит общую производительность. Лучший подход — запрашивать большее количество элементов за раз. Метод batch() позволяет вышестоящему издателю использовать более эффективные размеры запросов, а нижестоящим подписчикам — меньшие размеры запросов.

Давайте посмотрим, как это работает на практике. Как и раньше, мы начнем с потока без пакета :

@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction(""));

LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}

Здесь CompliantPublisher — это просто тестовый издатель , который производит целые числа до, но исключая значение, переданное конструктору. Давайте запустим его, чтобы увидеть непакетное поведение:

CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331

Вывод показывает, что производитель выдает значения одно за другим . Теперь давайте добавим step batch() в наш конвейер, чтобы вышестоящий издатель создавал до пяти элементов одновременно:

@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {

TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction("before batch"))
.batch(5, Action.noop())
.wiretap(new LoggingAction("after batch"));

LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}

Метод batch() принимает два аргумента: количество элементов, запрошенных при каждом вызове request() , и действие для обработки отброшенных элементов, то есть элементов, запрошенных, но не использованных. Эта ситуация может возникнуть, если есть ошибка или нижестоящий подписчик вызывает cancel() . Посмотрим получившийся лог выполнения:

LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690

Мы видим, что теперь издатель каждый раз получает запросы на пять элементов . Обратите внимание, что в этом тестовом сценарии мы видим два запроса к производителю еще до того, как подписчик на ведение журнала получит первый элемент. Причина в том, что в этом тестовом сценарии у нас однопоточное выполнение, поэтому batch () продолжает буферизовать элементы до тех пор, пока не получит сигнал onComplete() .

7. Использование потоков в веб-приложениях

Ratpack поддерживает использование реактивных потоков в сочетании со своей асинхронной веб-платформой.

7.1. Получение потока данных

Для входящих данных объект Request , доступный через Context обработчика, предоставляет метод getBodyStream() , который возвращает TransformablePublisher объектов ByteBuf .

Из этого издателя мы можем построить наш конвейер обработки:

@Bean
public Action<Chain> uploadFile() {

return chain -> chain.post("upload", ctx -> {
TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
pub.subscribe(new Subscriber<ByteBuf>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription sub) {
this.sub = sub;
sub.request(1);
}

@Override
public void onNext(ByteBuf t) {
try {
... do something useful with received data
sub.request(1);
}
finally {
// DO NOT FORGET to RELEASE !
t.release();
}
}

@Override
public void onError(Throwable t) {
ctx.getResponse().status(500);
}

@Override
public void onComplete() {
ctx.getResponse().status(202);
}
});
});
}

Есть несколько деталей, которые следует учитывать при реализации подписчиков. Во- первых, мы должны убедиться, что в какой-то момент вызываем метод Release () класса ByteBuf . Невыполнение этого требования приведет к утечке памяти . Во-вторых, любая асинхронная обработка должна использовать только примитивы Ratpack. К ним относятся Promise , Blocking и подобные конструкции. **** ``

7.2. Отправка потока данных

Самый прямой способ отправить поток данных — использовать Response.sendStream() . Этот метод принимает аргумент издателя ByteBuf и отправляет данные клиенту, применяя обратное давление по мере необходимости, чтобы избежать его переполнения:

@Bean
public Action<Chain> download() {
return chain -> chain.get("download", ctx -> {
ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
});
}

Как бы просто это ни было, у этого метода есть недостаток: он не будет устанавливать сам по себе какой-либо заголовок, включая Content-Length , что может быть проблемой для клиентов:

$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted

В качестве альтернативы лучше использовать метод Context render() дескриптора , передав объект ResponseChunks . В этом случае ответ будет использовать метод кодирования передачи « фрагментированный ». Самый простой способ создать экземпляр ResponseChunks — использовать один из статических методов, доступных в этом классе:

@Bean
public Action<Chain> downloadChunks() {
return chain -> chain.get("downloadChunks", ctx -> {
ctx.render(ResponseChunks.bufferChunks("application/octetstream",
new RandomBytesPublisher(1024,512)));
});
}

С этим изменением ответ теперь включает заголовок типа контента :

$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted

7.3. Использование событий на стороне сервера

Поддержка событий на стороне сервера (SSE) также использует метод render() . Однако в этом случае мы используем ServerSentEvents для адаптации элементов, поступающих от производителя , к объектам событий , которые включают некоторые метаданные вместе с полезной нагрузкой события:

@Bean
public Action<Chain> quotes() {
ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
evt
.id(Long.toString(idSeq.incrementAndGet()))
.event("quote")
.data( q -> q.toString());
});

return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}

Здесь QuotesService — это всего лишь пример службы, которая создает Publisher , который через равные промежутки времени генерирует случайные котировки. Второй аргумент — это функция, которая подготавливает событие к отправке. Сюда входит добавление идентификатора , типа события и самой полезной нагрузки.

Мы можем использовать curl для тестирования этого метода, получая вывод, который показывает последовательность случайных кавычек вместе с метаданными события:

$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]

... more quotes

7.4. Трансляция данных веб-сокета

Мы можем передавать данные от любого издателя к соединению WebSocket, используя Websockets.websocketBroadcast() :

@Bean
public Action<Chain> quotesWS() {
Publisher<String> pub = Streams.transformable(quotesService.newTicker())
.map(Quote::toString);
return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}

Здесь мы используем тот же QuotesService , который мы видели ранее, в качестве источника событий для трансляции котировок клиентам. Давайте снова воспользуемся curl для имитации клиента WebSocket:

$ curl --include -v \
--no-buffer \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
--header "Sec-WebSocket-Version: 13" \
http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted

8. Заключение

В этой статье мы рассмотрели поддержку Ratpack для реактивных потоков и способы ее применения в различных сценариях.

Как обычно, полный исходный код примеров можно найти на GitHub .