Перейти к основному содержимому

HTTP-клиент Jetty ReactiveStreams

· 6 мин. чтения

1. Обзор

В этом руководстве мы узнаем, как использовать клиент Reactive HTTP от Jetty . Мы продемонстрируем его использование с различными библиотеками Reactive, создав небольшие тестовые примеры.

2. Что такое реактивный HttpClient ?

HttpClient Jetty позволяет нам блокировать HTTP-запросы. Однако когда мы имеем дело с реактивным API, мы не можем использовать стандартный HTTP-клиент. Чтобы восполнить этот пробел, Jetty создала оболочку для API HttpClient , чтобы она также поддерживала API ReactiveStreams .

Реактивный HttpClient используется либо для потребления, либо для создания потока данных через HTTP-вызовы.

В примере, который мы собираемся продемонстрировать, будет клиент Reactive HTTP, который будет взаимодействовать с сервером Jetty, используя различные библиотеки Reactive. Мы также поговорим о событиях запроса и ответа, предоставляемых Reactive HttpClient .

Мы рекомендуем прочитать наши статьи о Project Reactor , RxJava и Spring WebFlux , чтобы лучше понять концепции реактивного программирования и его терминологию.

3. Зависимости Maven

Давайте начнем пример с добавления зависимостей для Reactive Streams , Project Reactor , RxJava , Spring WebFlux и Jetty's Reactive HTTPClient в наш pom.xml. Наряду с этим мы также добавим зависимость от Jetty Server для создания сервера:

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-reactive-httpclient</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.19.v20190610</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>

4. Создание сервера и клиента

Теперь давайте создадим сервер и добавим обработчик запроса, который просто записывает тело запроса в ответ:

public class RequestHandler extends AbstractHandler {
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
jettyRequest.setHandled(true);
response.setContentType(request.getContentType());
IO.copy(request.getInputStream(), response.getOutputStream());
}
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

И тогда мы можем написать HttpClient :

HttpClient httpClient = new HttpClient();
httpClient.start();

Теперь, когда мы создали клиент и сервер, давайте посмотрим, как мы можем преобразовать этот блокирующий HTTP-клиент в неблокирующий и создать запрос:

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

Итак, обертка ReactiveRequest , предоставленная Jetty, сделала наш блокирующий HTTP-клиент реактивным. Давайте продолжим и посмотрим на его использование с различными реактивными библиотеками.

5. Использование реактивных потоков

HttpClient Jetty изначально поддерживает Reactive Streams , так что давайте начнем с этого.

Теперь Reactive Streams — это просто набор интерфейсов , поэтому для нашего тестирования давайте реализуем простого блокирующего подписчика:

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
public void onNext(ReactiveResponse response) {
sink.offer(response);
}

@Override
public void onError(Throwable failure) { }

@Override
public void onComplete() { }

public ReactiveResponse block() throws InterruptedException {
return sink.poll(5, TimeUnit.SECONDS);
}
}

Обратите внимание, что нам нужно было вызвать Subscription#request в соответствии с JavaDoc, в котором говорится, что « Издатель не будет отправлять никакие события, пока с помощью этого метода не будет сообщено о запросе».

Также обратите внимание, что мы добавили механизм безопасности, чтобы наш тест мог выйти из строя, если он не увидел значение в течение 5 секунд.

И теперь мы можем быстро протестировать наш HTTP-запрос:

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Использование реактора проекта

Давайте теперь посмотрим, как мы можем использовать Reactive HttpClient с Project Reactor. Создание издателя практически такое же, как и в предыдущем разделе.

После создания издателя воспользуемся классом Mono из Project Reactor, чтобы получить реактивный ответ:

ReactiveResponse response = Mono.from(publisher).block();

И затем мы можем проверить полученный ответ:

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. Использование Spring WebFlux

Преобразование блокирующего HTTP-клиента в реактивный легко выполняется при использовании Spring WebFlux. Spring WebFlux поставляется с реактивным клиентом WebClient , который можно использовать с различными библиотеками HTTP-клиентов . Мы можем использовать это как альтернативу использованию прямого кода Project Reactor.

Итак, сначала давайте обернем HTTP-клиент Jetty с помощью JettyClientHttpConnector , чтобы связать его с WebClient:

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

А затем передайте этот коннектор WebClient для выполнения неблокирующих HTTP-запросов:

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

Затем давайте выполним настоящий HTTP-вызов с помощью только что созданного реактивного HTTP-клиента и проверим результат:

String responseContent = client.post()
.uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
.retrieve()
.bodyToMono(String.class)
.block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. Использование RxJava2

Давайте теперь продолжим и посмотрим, как клиент Reactive HTTP используется с RxJava2 .

Пока мы здесь, давайте немного изменим наш пример, чтобы теперь включить тело в запрос:

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.content(ReactiveRequest.Content
.fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
.build();
Publisher<String> publisher = reactiveRequest
.response(ReactiveResponse.Content.asString());

Код ReactiveResponse.Content.asString() преобразует тело ответа в строку. Также можно отклонить ответ с помощью метода ReactiveResponse.Content.discard() , если нас интересует только статус запроса.

Теперь мы видим, что получение ответа с помощью RxJava2 на самом деле очень похоже на Project Reactor. По сути, мы просто используем Single вместо Mono :

String responseContent = Single.fromPublisher(publisher)
.blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. События запроса и ответа

Клиент Reactive HTTP генерирует ряд событий во время выполнения. Они классифицируются как события запроса и события ответа. Эти события помогают заглянуть в жизненный цикл реактивного HTTP-клиента.

На этот раз давайте немного по-другому сделаем наш реактивный запрос, используя HTTP-клиент вместо запроса:

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
.content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
.build();

А теперь давайте получим Publisher событий HTTP-запроса:

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

Теперь давайте снова воспользуемся RxJava. На этот раз мы создадим список, содержащий типы событий, и заполним его подпиской на события запросов по мере их возникновения:

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

Затем, поскольку мы находимся в тесте, мы можем заблокировать наш ответ и проверить:

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

Точно так же мы можем подписаться и на события ответа. Поскольку они аналогичны подписке на событие запроса, мы добавили сюда только последнюю. Полную реализацию с событиями запроса и ответа можно найти в репозитории GitHub, ссылка на который приведена в конце этой статьи.

9. Заключение

В этом руководстве мы узнали о ReactiveStreams HttpClient , предоставляемом Jetty, его использовании с различными библиотеками Reactive и событиях жизненного цикла, связанных с запросом Reactive.

Все фрагменты кода, упомянутые в статье, можно найти в нашем репозитории GitHub .