1. Обзор
В этом руководстве мы узнаем, как использовать клиент Reactive HTTP от Jetty . Мы продемонстрируем его использование с различными библиотеками Reactive, создав небольшие тестовые примеры.
2. Что такое реактивный HttpClient
?
HttpClient
Jetty позволяет нам блокировать HTTP-запросы. Однако когда мы имеем дело с реактивным API, мы не можем использовать стандартный HTTP-клиент. Чтобы восполнить этот пробел, Jetty создала оболочку для API HttpClient
, чтобы она также поддерживала API ReactiveStreams .
Реактивный HttpClient
используется либо для потребления, либо для создания потока данных через HTTP-вызовы.
В примере, который мы собираемся продемонстрировать, будет клиент Reactive HTTP, который будет взаимодействовать с сервером Jetty, используя различные библиотеки Reactive. Мы также поговорим о событиях запроса и ответа, предоставляемых Reactive HttpClient
.
Мы рекомендуем прочитать наши статьи о Project Reactor , RxJava и Spring WebFlux , чтобы лучше понять концепции реактивного программирования и его терминологию.
3. Зависимости Maven
Давайте начнем пример с добавления зависимостей для Reactive Streams , Project Reactor , RxJava , Spring WebFlux и Jetty's Reactive HTTPClient
в наш pom.xml.
Наряду с этим мы также добавим зависимость от Jetty Server для создания сервера:
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-reactive-httpclient</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.19.v20190610</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>
4. Создание сервера и клиента
Теперь давайте создадим сервер и добавим обработчик запроса, который просто записывает тело запроса в ответ:
public class RequestHandler extends AbstractHandler {
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
jettyRequest.setHandled(true);
response.setContentType(request.getContentType());
IO.copy(request.getInputStream(), response.getOutputStream());
}
}
...
Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();
И тогда мы можем написать HttpClient
:
HttpClient httpClient = new HttpClient();
httpClient.start();
Теперь, когда мы создали клиент и сервер, давайте посмотрим, как мы можем преобразовать этот блокирующий HTTP-клиент в неблокирующий и создать запрос:
Request request = httpClient.newRequest("http://localhost:8080/");
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();
Итак, обертка ReactiveRequest
, предоставленная Jetty, сделала наш блокирующий HTTP-клиент реактивным. Давайте продолжим и посмотрим на его использование с различными реактивными библиотеками.
5. Использование реактивных потоков
HttpClient
Jetty изначально поддерживает Reactive Streams , так что давайте начнем с этого.
Теперь Reactive Streams — это просто набор интерфейсов , поэтому для нашего тестирования давайте реализуем простого блокирующего подписчика:
public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}
@Override
public void onNext(ReactiveResponse response) {
sink.offer(response);
}
@Override
public void onError(Throwable failure) { }
@Override
public void onComplete() { }
public ReactiveResponse block() throws InterruptedException {
return sink.poll(5, TimeUnit.SECONDS);
}
}
Обратите внимание, что нам нужно было вызвать Subscription#request
в соответствии с JavaDoc, в котором говорится, что « Издатель
не будет отправлять никакие события, пока с помощью этого метода не будет сообщено о запросе».
Также обратите внимание, что мы добавили механизм безопасности, чтобы наш тест мог выйти из строя, если он не увидел значение в течение 5 секунд.
И теперь мы можем быстро протестировать наш HTTP-запрос:
BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6. Использование реактора проекта
Давайте теперь посмотрим, как мы можем использовать Reactive HttpClient
с Project Reactor. Создание издателя практически такое же, как и в предыдущем разделе.
После создания издателя воспользуемся классом Mono
из Project Reactor, чтобы получить реактивный ответ:
ReactiveResponse response = Mono.from(publisher).block();
И затем мы можем проверить полученный ответ:
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6.1. Использование Spring WebFlux
Преобразование блокирующего HTTP-клиента в реактивный легко выполняется при использовании Spring WebFlux. Spring WebFlux поставляется с реактивным клиентом WebClient
, который можно использовать с различными библиотеками HTTP-клиентов . Мы можем использовать это как альтернативу использованию прямого кода Project Reactor.
Итак, сначала давайте обернем HTTP-клиент Jetty с помощью JettyClientHttpConnector
, чтобы связать его с WebClient:
ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);
А затем передайте этот коннектор WebClient
для выполнения неблокирующих HTTP-запросов:
WebClient client = WebClient.builder().clientConnector(clientConnector).build();
Затем давайте выполним настоящий HTTP-вызов с помощью только что созданного реактивного HTTP-клиента и проверим результат:
String responseContent = client.post()
.uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
.retrieve()
.bodyToMono(String.class)
.block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);
7. Использование RxJava2
Давайте теперь продолжим и посмотрим, как клиент Reactive HTTP используется с RxJava2 .
Пока мы здесь, давайте немного изменим наш пример, чтобы теперь включить тело в запрос:
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.content(ReactiveRequest.Content
.fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
.build();
Publisher<String> publisher = reactiveRequest
.response(ReactiveResponse.Content.asString());
Код ReactiveResponse.Content.asString()
преобразует тело ответа в строку. Также можно отклонить ответ с помощью метода ReactiveResponse.Content.discard()
, если нас интересует только статус запроса.
Теперь мы видим, что получение ответа с помощью RxJava2 на самом деле очень похоже на Project Reactor. По сути, мы просто используем Single
вместо Mono
:
String responseContent = Single.fromPublisher(publisher)
.blockingGet();
Assert.assertEquals("Hello World!", responseContent);
8. События запроса и ответа
Клиент Reactive HTTP генерирует ряд событий во время выполнения. Они классифицируются как события запроса и события ответа. Эти события помогают заглянуть в жизненный цикл реактивного HTTP-клиента.
На этот раз давайте немного по-другому сделаем наш реактивный запрос, используя HTTP-клиент вместо запроса:
ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
.content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
.build();
А теперь давайте получим Publisher
событий HTTP-запроса:
Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();
Теперь давайте снова воспользуемся RxJava. На этот раз мы создадим список, содержащий типы событий, и заполним его подпиской на события запросов по мере их возникновения:
List<Type> requestEventTypes = new ArrayList<>();
Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());
Затем, поскольку мы находимся в тесте, мы можем заблокировать наш ответ и проверить:
int actualStatus = response.blockingGet().getStatus();
Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);
Точно так же мы можем подписаться и на события ответа. Поскольку они аналогичны подписке на событие запроса, мы добавили сюда только последнюю. Полную реализацию с событиями запроса и ответа можно найти в репозитории GitHub, ссылка на который приведена в конце этой статьи.
9. Заключение
В этом руководстве мы узнали о ReactiveStreams HttpClient
, предоставляемом Jetty, его использовании с различными библиотеками Reactive и событиях жизненного цикла, связанных с запросом Reactive.
Все фрагменты кода, упомянутые в статье, можно найти в нашем репозитории GitHub .