Перейти к основному содержимому

Реактивный клиентский API JAX-RS

· 6 мин. чтения

1. Введение

В этом руководстве мы рассмотрим поддержку JAX-RS для реактивного (Rx) программирования с использованием Джерси API. В этой статье предполагается, что читатель знаком с клиентским API REST Джерси.

Некоторое знакомство с концепциями реактивного программирования будет полезно, но не обязательно.

2. Зависимости

Во-первых, нам нужны стандартные зависимости клиентской библиотеки Джерси:

<dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-client</artifactId>
    <version>2.27</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.27</version>
</dependency>

Эти зависимости дают нам стандартную поддержку реактивного программирования JAX-RS. Текущие версии jersey-client и jersey-hk2 доступны на Maven Central.

Для поддержки сторонних реактивных фреймворков мы будем использовать следующие расширения:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-rxjava</artifactId>
    <version>2.27</version>
</dependency>

Приведенная выше зависимость обеспечивает поддержку Observable RxJava ; для нового RxJava2 Flowable мы используем следующее расширение:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-rxjava2</artifactId>
    <version>2.27</version>
</dependency>

Зависимости от rxjava и rxjava2 также доступны на Maven Central.

3. Зачем нужны реактивные клиенты JAX-RS

Допустим, у нас есть три REST API для использования:

  • id - сервис предоставляет список длинных идентификаторов пользователей
  • служба имен предоставляет имя пользователя для данного идентификатора пользователя
  • хэш - сервис вернет хэш идентификатора пользователя и имени пользователя.

Создаем клиент для каждой из служб:

Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService
= client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");

Это надуманный пример, но он подходит для нашей иллюстрации. Спецификация JAX-RS поддерживает как минимум три подхода к совместному использованию этих сервисов:

  • Синхронный (блокирующий)
  • Асинхронный (неблокирующий)
  • Реактивный (функциональный, неблокирующий)

3.1. Проблема с синхронным вызовом клиента Джерси

При ванильном подходе к использованию этих сервисов мы используем id-сервис для получения идентификаторов пользователей, а затем последовательно вызываем API-интерфейсы службы имен и хэш-сервиса для каждого возвращаемого идентификатора.

При таком подходе каждый вызов блокирует работающий поток до тех пор, пока запрос не будет выполнен , тратя в сумме много времени на выполнение комбинированного запроса. Это явно меньше, чем удовлетворительно в любом нетривиальном случае использования.

3.2. Проблема с асинхронным вызовом клиента Джерси

Более сложный подход заключается в использовании механизма InvocationCallback , поддерживаемого JAX-RS. В самой простой форме мы передаем обратный вызов методу get , чтобы определить, что произойдет, когда данный вызов API завершится.

Хотя теперь мы получаем истинное асинхронное выполнение ( с некоторыми ограничениями на эффективность потоков ), легко увидеть, как этот стиль кода может стать нечитаемым и громоздким в любых сценариях, кроме тривиальных. Спецификация JAX-RS специально выделяет этот сценарий как Пирамиду Судьбы :

// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());

userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.async()
.get(new InvocationCallback<List<Long>>() {
@Override
public void completed(List<Long> employeeIds) {
employeeIds.forEach((id) -> {
// for each employee ID, get the name
nameService.resolveTemplate("userId", id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String response) {
hashService.resolveTemplate("rawValue", response + id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String response) {
//complete the business logic
}
// ommitted implementation of the failed() method
});
}
// omitted implementation of the failed() method
});
});
}
// omitted implementation of the failed() method
});

// wait for inner requests to complete in 10 seconds
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("Some requests didn't complete within the timeout");
}

Итак, мы получили асинхронный, эффективный по времени код, но:

  • это трудно читать
  • каждый вызов порождает новый поток

Обратите внимание, что мы используем CountDownLatch во всех примерах кода, чтобы дождаться доставки всех ожидаемых значений хэш-службой. Мы делаем это для того, чтобы мы могли утверждать, что код работает в модульном тесте, проверяя, что все ожидаемые значения действительно были доставлены.

Обычный клиент не будет ждать, а сделает все, что нужно сделать с результатом внутри обратного вызова, чтобы не блокировать поток.

3.3. Функциональное реактивное решение

Функциональный и реактивный подход даст нам:

  • Отличная читаемость кода
  • Свободный стиль кодирования
  • Эффективное управление потоками

JAX-RS поддерживает эти цели в следующих компонентах:

  • CompletionStageRxInvoker поддерживает интерфейс CompletionStage в качестве реактивного компонента по умолчанию.
  • RxObservableInvokerProvider поддерживает Observable RxJava.
  • RxFlowableInvokerProvider поддерживает Flowable RxJava

Существует также API для добавления поддержки других библиотек Reactive.

4. Поддержка реактивных компонентов JAX-RS

4.1. Этап завершения в JAX-RS

Используя CompletionStage и его конкретную реализацию CompletableFuture , мы можем написать элегантную, неблокирующую и плавную оркестровку вызовов службы.

Начнем с получения идентификаторов пользователей:

CompletionStage<List<Long>> userIdStage = userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.rx()
.get(new GenericType<List<Long>>() {
}).exceptionally((throwable) -> {
logger.warn("An error has occurred");
return null;
});

Вызов метода rx() — это точка, с которой начинается реактивная обработка. Мы используем исключительную функцию, чтобы плавно определить наш сценарий обработки исключений.

Отсюда мы можем четко организовать вызовы для получения имени пользователя из службы имен, а затем хешировать комбинацию имени и идентификатора пользователя:

List<String> expectedHashValues = ...;
List<String> receivedHashValues = new ArrayList<>();

// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());

userIdStage.thenAcceptAsync(employeeIds -> {
logger.info("id-service result: {}", employeeIds);
employeeIds.forEach((Long id) -> {
CompletableFuture completable = nameService.resolveTemplate("userId", id).request()
.rx()
.get(String.class)
.toCompletableFuture();

completable.thenAccept((String userName) -> {
logger.info("name-service result: {}", userName);
hashService.resolveTemplate("rawValue", userName + id).request()
.rx()
.get(String.class)
.toCompletableFuture()
.thenAcceptAsync(hashValue -> {
logger.info("hash-service result: {}", hashValue);
receivedHashValues.add(hashValue);
completionTracker.countDown();
}).exceptionally((throwable) -> {
logger.warn("Hash computation failed for {}", id);
return null;
});
});
});
});

if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("Some requests didn't complete within the timeout");
}

assertThat(receivedHashValues).containsAll(expectedHashValues);

В приведенном выше примере мы составляем наше выполнение трех сервисов с беглым и читаемым кодом.

Метод thenAcceptAsync выполнит предоставленную функцию после того, как данный CompletionStage завершит выполнение (или вызовет исключение).

Каждый последующий вызов не блокируется, что позволяет разумно использовать системные ресурсы.

Интерфейс CompletionStage предоставляет широкий спектр методов подготовки и оркестровки, которые позволяют нам составлять, упорядочивать и асинхронно выполнять любое количество шагов в многоэтапной оркестрации (или один вызов службы).

4.2. Наблюдаемый в JAX-RS

Чтобы использовать компонент Observable RxJava, мы должны сначала зарегистрировать поставщика RxObservableInvokerProvider (а не « ObservableRxInvokerProvider» , как указано в документе спецификации Jersey) на клиенте:

Client client = client.register(RxObservableInvokerProvider.class);

Затем мы переопределяем вызывающую программу по умолчанию:

Observable<List<Long>> userIdObservable = userIdService
.request()
.rx(RxObservableInvoker.class)
.get(new GenericType<List<Long>>(){});

С этого момента мы можем использовать стандартную семантику Observable для организации потока обработки :

userIdObservable.subscribe((List<Long> listOfIds)-> { 
/** define processing flow for each ID */
});

4.3. Текучесть в JAX-RS

Семантика использования RxJava Flowable аналогична семантике Observable. Регистрируем соответствующего провайдера:

client.register(RxFlowableInvokerProvider.class);

Затем мы предоставляем RxFlowableInvoker :

Flowable<List<Long>> userIdFlowable = userIdService
.request()
.rx(RxFlowableInvoker.class)
.get(new GenericType<List<Long>>(){});

После этого мы можем использовать обычный Flowable API.

5. Вывод

Спецификация JAX-RS предоставляет большое количество опций, обеспечивающих чистое, неблокирующее выполнение вызовов REST.

Интерфейс CompletionStage , в частности, предоставляет надежный набор методов, которые охватывают различные сценарии оркестровки служб, а также возможности предоставления пользовательских исполнителей для более детального контроля над управлением потоками.

Вы можете проверить код этой статьи на Github .