1. Введение
Ratpack — это фреймворк, созданный поверх движка Netty , который позволяет нам быстро создавать HTTP-приложения. Мы уже рассмотрели его основное использование в предыдущих статьях . На этот раз мы покажем, как использовать его потоковый API для реализации реактивных приложений .
2. Краткий обзор реактивных потоков
Прежде чем перейти к фактической реализации, давайте сначала кратко рассмотрим, что представляет собой реактивное приложение. По мнению авторов оригинала , такие приложения должны обладать следующими свойствами:
- Отзывчивый
- Устойчивый
- Эластичный
- Сообщение
Итак, как Reactive Streams помогает нам достичь любого из этих свойств? Ну, в этом контексте управление сообщениями
не обязательно подразумевает использование промежуточного программного обеспечения для обмена сообщениями. Вместо этого для решения этой проблемы на самом деле требуется асинхронная обработка запросов и поддержка неблокирующего обратного давления .
Реактивная поддержка Ratpack использует стандарт API Reactive Streams для JVM в качестве основы для своей реализации. Таким образом, он обеспечивает взаимодействие с другими совместимыми платформами, такими как Project Reactor и RxJava .
3. Использование класса потоков
Ratpacks
Класс Streams
Ratpack предоставляет несколько служебных методов для создания экземпляров Publisher
, которые затем можно использовать для создания конвейеров обработки данных.
Хорошей отправной точкой является метод publish()
, который мы можем использовать для создания Publisher
из любого Iterable
:
Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
Здесь LoggingSubscriber
— это тестовая реализация интерфейса подписчика
, которая просто регистрирует каждый объект, созданный издателем. Он также включает вспомогательный метод block()
, который, как следует из названия, блокирует вызывающую программу до тех пор, пока издатель не выпустит все свои объекты или не выдаст ошибку.
Запустив тестовый пример, мы увидим ожидаемую последовательность событий:
onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908
Другой полезный метод — yield()
. Он имеет единственный параметр Function
, который получает объект YieldRequest
и возвращает следующий объект для генерации:
@Test
public void whenYield_thenSuccess() {
Publisher<String> pub = Streams.yield((t) -> {
return t.getRequestNum() < 5 ? "hello" : null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
Параметр YieldRequest
позволяет нам реализовать логику, основанную на количестве сгенерированных объектов, используя его метод getRequestNum()
. В нашем примере мы используем эту информацию для определения конечного условия, которое мы сигнализируем, возвращая нулевое
значение.
Теперь давайте посмотрим, как создать Publisher
для периодических событий:
@Test
public void whenPeriodic_thenSuccess() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
return t < 5 ? String.format("hello %d",t): null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
Возвращенный издатель использует ScheduledExecutorService
для периодического вызова функции производителя, пока она не вернет нулевое
значение. Функция-производитель получает целочисленное значение, соответствующее количеству уже выпущенных объектов, которое мы используем для завершения потока.
4. Использование TransformablePublisher
Присмотревшись к методам Streams
, мы увидим, что они обычно возвращают TransformablePublisher
. Этот интерфейс расширяет возможности Publisher
несколькими служебными методами, которые, подобно тому, что мы находим в Flux
и Mono
в Project Reactor , упрощают создание сложных конвейеров обработки из отдельных шагов .
В качестве примера воспользуемся методом map
для преобразования последовательности целых чисел в строки:
@Test
public void whenMap_thenSuccess() throws Exception {
TransformablePublisher<String> pub = Streams.yield( t -> {
return t.getRequestNum() < 5 ? t.getRequestNum() : null;
})
.map(v -> String.format("item %d", v));
ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
assertTrue("should succeed", result.isSuccess());
assertEquals("should have 5 items",5,result.getValue().size());
}
Здесь фактическое выполнение происходит внутри пула потоков, управляемого тестовым служебным классом ExecHarness
. Так как yieldSingle()
ожидает Promise
, мы используем toList()
для адаптации нашего издателя. Этот метод собирает все результаты, полученные подписчиком, и сохраняет их в списке
.
Как указано в документации, мы должны соблюдать осторожность при использовании этого метода. Применение его к неограниченному издателю может быстро привести к нехватке памяти JVM! Чтобы избежать этой ситуации, мы должны ограничивать его использование в основном модульными тестами .
Помимо map()
, в TransformablePublisher
есть несколько полезных операторов:
filter()
: фильтровать восходящие объекты на основепредиката .
take()
: испускает только первыеn
объектов из вышестоящегоиздателя .
wiretap()
: добавляет точку наблюдения, где мы можем проверять данные и события, когда они проходят через конвейер.reduce()
: уменьшить вышестоящие объекты до одного значенияtransform()
: вводит в поток обычногоиздателя .
5. Использование buffer()
с несовместимыми издателями
В некоторых сценариях мы должны иметь дело с издателем
, который отправляет своим подписчикам больше элементов, чем запрошено . Чтобы справиться с такими сценариями, потоки Ratpack предлагают метод buffer()
, который сохраняет эти дополнительные элементы в памяти до тех пор, пока они не будут использованы подписчиками.
Чтобы проиллюстрировать, как это работает, давайте создадим простой несовместимый издатель
, который игнорировал количество запрошенных элементов. Вместо этого он всегда будет производить как минимум на 5 элементов больше, чем запрошено:
private class NonCompliantPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
}
private class NonCompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel = 0;
public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
log.info("request: n={}", n);
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (n + 5) ; i ++ ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
}
}
Во-первых, давайте протестируем этого издателя, используя наш LoggingSubscriber.
Мы будем использовать оператор take()
, чтобы получить только первый элемент.
@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction(""))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
Запустив этот тест, мы видим, что, несмотря на получение запроса cancel()
, наш несоответствующий издатель продолжает создавать новые элементы:
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
Теперь давайте добавим в этот поток шаг buffer() .
Мы добавим два шага прослушивания
для регистрации событий перед ним, чтобы его эффект стал более очевидным:
@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction("before buffer"))
.buffer()
.wiretap(new LoggingAction("after buffer"))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
На этот раз запуск этого кода приводит к другой последовательности журнала:
LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214
Сообщения «до буфера» показывают, что наш несовместимый издатель смог отправить все значения после первого вызова request
. Тем не менее, нисходящие значения по-прежнему отправлялись одно за другим , соблюдая количество, запрошенное LoggingSubscriber
.
6. Использование batch()
с медленными подписчиками
Другой сценарий, который может снизить пропускную способность приложения, — это когда нижестоящие подписчики запрашивают данные небольшими объемами. Наш LoggingSubscriber
— хороший пример: он запрашивает только один элемент за раз.
В реальных приложениях это может привести к большому количеству переключений контекста, что снизит общую производительность. Лучший подход — запрашивать большее количество элементов за раз. Метод batch()
позволяет вышестоящему издателю использовать более эффективные размеры запросов, а нижестоящим подписчикам — меньшие размеры запросов.
Давайте посмотрим, как это работает на практике. Как и раньше, мы начнем с потока без пакета
:
@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction(""));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
Здесь CompliantPublisher
— это просто тестовый издатель
, который производит целые числа до, но исключая значение, переданное конструктору. Давайте запустим его, чтобы увидеть непакетное поведение:
CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331
Вывод показывает, что производитель выдает значения одно за другим . Теперь давайте добавим step batch()
в наш конвейер, чтобы вышестоящий издатель создавал до пяти элементов одновременно:
@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction("before batch"))
.batch(5, Action.noop())
.wiretap(new LoggingAction("after batch"));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
Метод batch()
принимает два аргумента: количество элементов, запрошенных при каждом вызове request() , и
действие
для обработки отброшенных элементов, то есть элементов, запрошенных, но не использованных. Эта ситуация может возникнуть, если есть ошибка или нижестоящий подписчик вызывает cancel()
. Посмотрим получившийся лог выполнения:
LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690
Мы видим, что теперь издатель каждый раз получает запросы на пять элементов . Обратите внимание, что в этом тестовом сценарии мы видим два
запроса к производителю еще до того, как подписчик на ведение журнала получит первый элемент. Причина в том, что в этом тестовом сценарии у нас однопоточное выполнение, поэтому batch
() продолжает буферизовать элементы до тех пор, пока не получит сигнал onComplete() .
7. Использование потоков в веб-приложениях
Ratpack поддерживает использование реактивных потоков в сочетании со своей асинхронной веб-платформой.
7.1. Получение потока данных
Для входящих данных объект Request , доступный через
Context
обработчика, предоставляет метод getBodyStream()
, который возвращает TransformablePublisher
объектов ByteBuf
.
Из этого издателя мы можем построить наш конвейер обработки:
@Bean
public Action<Chain> uploadFile() {
return chain -> chain.post("upload", ctx -> {
TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
pub.subscribe(new Subscriber<ByteBuf>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription sub) {
this.sub = sub;
sub.request(1);
}
@Override
public void onNext(ByteBuf t) {
try {
... do something useful with received data
sub.request(1);
}
finally {
// DO NOT FORGET to RELEASE !
t.release();
}
}
@Override
public void onError(Throwable t) {
ctx.getResponse().status(500);
}
@Override
public void onComplete() {
ctx.getResponse().status(202);
}
});
});
}
Есть несколько деталей, которые следует учитывать при реализации подписчиков. Во- первых, мы должны убедиться, что в какой-то момент вызываем метод Release ()
класса ByteBuf
. Невыполнение этого требования приведет к утечке памяти . Во-вторых, любая асинхронная обработка должна использовать только примитивы Ratpack. К ним относятся Promise
, Blocking
и подобные конструкции. ****
``
7.2. Отправка потока данных
Самый прямой способ отправить поток данных — использовать Response.sendStream()
. Этот метод принимает аргумент издателя ByteBuf
и отправляет данные клиенту, применяя обратное давление по мере необходимости, чтобы избежать его переполнения:
@Bean
public Action<Chain> download() {
return chain -> chain.get("download", ctx -> {
ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
});
}
Как бы просто это ни было, у этого метода есть недостаток: он не будет устанавливать сам по себе какой-либо заголовок, включая Content-Length
, что может быть проблемой для клиентов:
$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted
В качестве альтернативы лучше использовать метод Context
render()
дескриптора , передав объект ResponseChunks
. В этом случае ответ будет использовать метод кодирования передачи « фрагментированный ». Самый простой способ создать экземпляр ResponseChunks
— использовать один из статических методов, доступных в этом классе:
@Bean
public Action<Chain> downloadChunks() {
return chain -> chain.get("downloadChunks", ctx -> {
ctx.render(ResponseChunks.bufferChunks("application/octetstream",
new RandomBytesPublisher(1024,512)));
});
}
С этим изменением ответ теперь включает заголовок типа контента :
$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted
7.3. Использование событий на стороне сервера
Поддержка событий на стороне сервера (SSE) также использует метод render()
. Однако в этом случае мы используем ServerSentEvents
для адаптации элементов, поступающих от производителя
, к объектам событий
, которые включают некоторые метаданные вместе с полезной нагрузкой события:
@Bean
public Action<Chain> quotes() {
ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
evt
.id(Long.toString(idSeq.incrementAndGet()))
.event("quote")
.data( q -> q.toString());
});
return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}
Здесь QuotesService
— это всего лишь пример службы, которая создает Publisher
, который через равные промежутки времени генерирует случайные котировки. Второй аргумент — это функция, которая подготавливает событие к отправке. Сюда входит добавление идентификатора
, типа события и самой полезной нагрузки.
Мы можем использовать curl
для тестирования этого метода, получая вывод, который показывает последовательность случайных кавычек вместе с метаданными события:
$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]
... more quotes
7.4. Трансляция данных веб-сокета
Мы можем передавать данные от любого издателя
к соединению WebSocket, используя Websockets.websocketBroadcast()
:
@Bean
public Action<Chain> quotesWS() {
Publisher<String> pub = Streams.transformable(quotesService.newTicker())
.map(Quote::toString);
return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}
Здесь мы используем тот же QuotesService
, который мы видели ранее, в качестве источника событий для трансляции котировок клиентам. Давайте снова воспользуемся curl
для имитации клиента WebSocket:
$ curl --include -v \
--no-buffer \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
--header "Sec-WebSocket-Version: 13" \
http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted
8. Заключение
В этой статье мы рассмотрели поддержку Ratpack для реактивных потоков и способы ее применения в различных сценариях.
Как обычно, полный исходный код примеров можно найти на GitHub .