Перейти к основному содержимому

Пример интеграции Vertx и RxJava

· 6 мин. чтения

1. Обзор

RxJava — популярная библиотека для создания асинхронных программ и программ, основанных на событиях. Она черпает вдохновение из основных идей, выдвинутых инициативой Reactive Extensions .

Vert.x , проект под эгидой Eclipse , предлагает несколько компонентов, разработанных с нуля для полного использования реактивной парадигмы.

При совместном использовании они могут стать надежной основой для любой Java -программы, которая должна быть реактивной.

В этой статье мы загрузим файл со списком названий городов и распечатаем для каждого из них продолжительность дня от восхода до заката.

Мы будем использовать данные, опубликованные из общедоступного REST API www.metaweather.com — для расчета продолжительности дневного света и RxJava с Vert.x , чтобы сделать это чисто реактивным способом. ``

2. Зависимость от Maven

Начнем с импорта vertx-rx-java2 :

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.5.0-Beta1</version>
</dependency>

На момент написания статьи интеграция между Vert.x и более новой версией RxJava 2 доступна только в виде бета-версии, которая, тем не менее, достаточно стабильна для создаваемой нами программы.

Обратите внимание, что io.vertx:vertx-rx-java2 зависит от io.reactivex.rxjava2:rxjava , поэтому нет необходимости явно импортировать какой -либо пакет , связанный с RxJava .

Последнюю версию интеграции Vert.x с RxJava можно найти на Maven Central .

3. Настройка

Как и в любом приложении, использующем Vert.x, мы начнем создавать объект vertx , основную точку входа для всех функций Vert.x :

Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();

Библиотека vertx-rx-java2 предоставляет два класса: io.vertx.core.Vertx и io.vertx.reactivex.core.Vertx . В то время как первая является обычной точкой входа для приложений, которые однозначно основаны на Vert.x , последняя — это та, которую мы должны использовать для интеграции с RxJava .

Мы продолжаем определять объекты, которые будем использовать позже:

FileSystem fileSystem = vertx.fileSystem();
HttpClient httpClient = vertx.createHttpClient();

` FileSystem Vert.x предоставляет доступ к файловой системе реактивным способом, а HttpClient Vert.x делает то же самое для HTTP` .

4. Реактивная цепь

В реактивном контексте легко объединить несколько более простых реактивных операторов для получения осмысленных вычислений.

Сделаем это для нашего примера :

fileSystem
.rxReadFile("cities.txt").toFlowable()
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)
.map(extractingWoeid())
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())
.map(Buffer::toJsonObject)
.map(toCityAndDayLength())
.subscribe(System.out::println, Throwable::printStackTrace);

Давайте теперь рассмотрим, как работает каждый логический фрагмент кода.

5. Названия городов

Первый шаг — прочитать файл, содержащий список названий городов, по одному названию в строке:

fileSystem
.rxReadFile("cities.txt").toFlowable()
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))

Метод rxReadFile() реактивно читает файл и возвращает Single<Buffer > RxJava . Итак, мы получили искомую интеграцию: асинхронность Vert.x в структуре данных от RxJava . ``

Файл всего один, поэтому мы получим одну эмиссию буфера с полным содержимым файла. Мы конвертируем этот вход в Flowable RxJava и сопоставляем строки файла, чтобы иметь Flowable , который вместо этого генерирует событие для каждого названия города.

6. Дескриптор города JSON

Имея название города, следующим шагом будет использование Metaweather REST API для получения кода идентификатора для этого города. Затем этот идентификатор будет использоваться для получения времени восхода и захода солнца в городе. Продолжим цепочку вызовов:

Продолжим цепочку вызовов:

.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)

Метод searchByCityName() использует HttpClient , который мы создали на первом шаге, для вызова службы REST , которая выдает идентификатор города. Затем с помощью второго flatMap() мы получаем буфер , содержащий ответ.

Давайте завершим этот шаг, написав тело searchByCityName() :

Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
HttpClientRequest req = httpClient.get(
new RequestOptions()
.setHost("www.metaweather.com")
.setPort(443)
.setSsl(true)
.setURI(format("/api/location/search/?query=%s", cityName)));
return req
.toFlowable()
.doOnSubscribe(subscription -> req.end());
}

HttpClient Vert.x возвращает Flowable RxJava , который выдает реактивный HTTP - ответ. На этом очередь выдает тело ответа, разделенного на Buffers .

Мы создали новый реактивный запрос к правильному URL-адресу, но отметили, что Vert.x требует, чтобы метод HttpClientRequest.end () вызывался для сигнализации о том, что запрос может быть отправлен, а также требуется по крайней мере одна подписка, прежде чем end() сможет быть успешно вызвана.

Решение для достижения этого состоит в том, чтобы использовать doOnSubscribe () RxJava для вызова end() , как только потребитель подписывается.

7. Идентификаторы городов

Теперь нам просто нужно получить значение свойства woeid возвращаемого объекта JSON , которое однозначно идентифицирует город с помощью пользовательского метода:

.map(extractingWoeid())

Метод ExtractingWoeid() возвращает функцию, которая извлекает идентификатор города из JSON , содержащегося в ответе службы REST :

private static Function<Buffer, Long> extractingWoeid() {
return cityBuffer -> cityBuffer
.toJsonArray()
.getJsonObject(0)
.getLong("woeid");
}

Обратите внимание, что мы можем использовать удобные методы toJson…() , предоставляемые Buffer , чтобы быстро получить доступ к нужным нам свойствам.

8. Детали города

Давайте продолжим реактивную цепочку, чтобы получить нужные нам детали из REST API :

.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())

Детализируем метод getDataByPlaceId () :

static Flowable<HttpClientResponse> getDataByPlaceId(
HttpClient httpClient, long placeId) {

return autoPerformingReq(
httpClient,
format("/api/location/%s/", placeId));
}

Здесь мы использовали тот же подход, что и на предыдущем шаге. getDataByPlaceId() возвращает Flowable<HttpClientResponse> . HttpClientResponse , в свою очередь, будет выдавать ответ API порциями, если он длиннее нескольких байтов.

С помощью метода toBufferFlowable() мы уменьшаем фрагменты ответа в один, чтобы иметь доступ к полному объекту JSON:

static Function<HttpClientResponse, Publisher<? extends Buffer>>
toBufferFlowable() {
return response -> response
.toObservable()
.reduce(
Buffer.buffer(),
Buffer::appendBuffer).toFlowable();
}

9. Время заката и восхода солнца

Продолжаем добавлять реактивную цепочку, извлекая интересующую нас информацию из объекта JSON :

.map(toCityAndDayLength())

Напишем метод toCityAndDayLength() :

static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
return json -> {
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
String cityName = json.getString("title");
return new CityAndDayLength(
cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
};
}

Он возвращает функцию, которая сопоставляет информацию, содержащуюся в JSON , для создания POJO , который просто вычисляет время в часах между восходом и заходом солнца.

10. Подписка

Реактивная цепь завершена. Теперь мы можем подписаться на полученный Flowable с помощью обработчика, который выводит сгенерированные экземпляры CityAndDayLength или трассировку стека в случае ошибок:

.subscribe(
System.out::println,
Throwable::printStackTrace)

Когда мы запускаем приложение, мы можем увидеть такой результат, в зависимости от города, содержащегося в списке, и даты запуска приложения:

In Chicago there are 13.3 hours of light.
In Milan there are 13.5 hours of light.
In Cairo there are 12.9 hours of light.
In Moscow there are 14.1 hours of light.
In Santiago there are 11.3 hours of light.
In Auckland there are 11.2 hours of light.

Порядок городов может отличаться от указанного в файле, поскольку все запросы к HTTP API выполняются асинхронно.

11. Заключение

В этой статье мы увидели, как легко смешивать реактивные модули Vert.x с операторами и логическими конструкциями, предоставляемыми RxJava .

Построенная нами реактивная цепочка, хотя и длинная, показала, насколько легко написать сложный сценарий.

Как всегда, полный исходный код доступен на GitHub .