Перейти к основному содержимому

Асинхронное HTTP-программирование с Play Framework

· 12 мин. чтения

1. Обзор

Часто нашим веб-службам необходимо использовать другие веб-службы для выполнения своей работы. Может быть сложно обслуживать запросы пользователей, сохраняя при этом малое время отклика. Медленная внешняя служба может увеличить наше время отклика и привести к тому, что наша система будет накапливать запросы, используя больше ресурсов. Вот где неблокирующий подход может быть очень полезен

В этом руководстве мы отправим несколько асинхронных запросов к службе из приложения Play Framework . Используя неблокирующие HTTP-возможности Java, мы сможем беспрепятственно запрашивать внешние ресурсы, не затрагивая нашу собственную основную логику.

В нашем примере мы рассмотрим библиотеку Play WebService .

2. Библиотека Play WebService (WS)

WS — это мощная библиотека, обеспечивающая асинхронные HTTP-вызовы с использованием Java Action .

Используя эту библиотеку, наш код отправляет эти запросы и продолжает работу без блокировки. Для обработки результата запроса мы предоставляем потребляющую функцию, то есть реализацию интерфейса Consumer .

Этот шаблон имеет некоторое сходство с реализацией обратных вызовов, промисов и шаблоном async/await в JavaScript.

Давайте создадим простой Consumer , который регистрирует некоторые данные ответа:

ws.url(url)
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()))

Наш Потребитель просто регистрируется в этом примере. Однако потребитель может делать с результатом все, что нам нужно, например сохранять результат в базе данных.

Если мы углубимся в реализацию библиотеки, то увидим, что WS обертывает и настраивает AsyncHttpClient Java , который является частью стандартного JDK и не зависит от Play.

3. Подготовьте пример проекта

Чтобы поэкспериментировать с фреймворком, давайте создадим несколько модульных тестов для запуска запросов. Мы создадим каркас веб-приложения, чтобы отвечать на них, и будем использовать инфраструктуру WS для выполнения HTTP-запросов.

3.1. Скелет веб-приложения

Прежде всего, мы создаем исходный проект с помощью команды sbt new :

sbt new playframework/play-java-seed.g8

Затем в новой папке мы редактируем файл build.sbt и добавляем зависимость библиотеки WS:

libraryDependencies += javaWs

Теперь мы можем запустить сервер командой sbt run :

$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

После запуска приложения мы можем проверить, все ли в порядке, просмотрев http://localhost:9000 , после чего откроется страница приветствия Play.

3.2. Среда тестирования

Чтобы протестировать наше приложение, мы будем использовать класс модульного теста HomeControllerTest .

Во- первых, нам нужно расширить WithServer , который обеспечит жизненный цикл сервера:

public class HomeControllerTest extends WithServer {

Благодаря своему родителю, этот класс теперь запускает наш каркас веб-сервера в тестовом режиме и на случайном порту перед запуском тестов. Класс WithServer также останавливает приложение после завершения теста.

Далее нам нужно предоставить приложение для запуска.

Мы можем создать его с помощью GuiceApplicationBuilder от Guice : ``

@Override
protected Application provideApplication() {
return new GuiceApplicationBuilder().build();
}

И, наконец, мы настроили URL-адрес сервера для использования в наших тестах, используя номер порта, предоставленный тестовым сервером:

@Override
@Before
public void setup() {
OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
if (optHttpsPort.isPresent()) {
port = optHttpsPort.getAsInt();
url = "https://localhost:" + port;
} else {
port = testServer.getRunningHttpPort()
.getAsInt();
url = "http://localhost:" + port;
}
}

Теперь мы готовы писать тесты. Комплексная среда тестирования позволяет нам сконцентрироваться на кодировании наших тестовых запросов.

4. Подготовьте WSRequest

Давайте посмотрим, как мы можем запускать базовые типы запросов, такие как GET или POST, а также составные запросы на загрузку файлов.

4.1. Инициализировать объект WSRequest

Прежде всего, нам нужно получить экземпляр WSClient для настройки и инициализации наших запросов.

В реальном приложении мы можем получить клиент, автоматически сконфигурированный с настройками по умолчанию, посредством внедрения зависимостей:

@Autowired
WSClient ws;

Однако в нашем тестовом классе мы используем WSTestClient , доступный из среды Play Test :

WSClient ws = play.test.WSTestClient.newClient(port);

Когда у нас есть клиент, мы можем инициализировать объект WSRequest , вызвав метод url :

ws.url(url)

Метод url делает достаточно, чтобы позволить нам запустить запрос. Однако мы можем настроить его дальше, добавив некоторые пользовательские настройки:

ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + num);

Как мы видим, добавить заголовки и параметры запроса довольно просто.

После того, как мы полностью настроили наш запрос, мы можем вызвать метод, чтобы инициировать его.

4.2. Общий запрос GET

Чтобы инициировать запрос GET, нам просто нужно вызвать метод get для нашего объекта WSRequest :

ws.url(url)
...
.get();

Поскольку это неблокирующий код, он запускает запрос, а затем продолжает выполнение на следующей строке нашей функции.

Объект, возвращаемый get , является экземпляром CompletionStage , который является частью CompletableFuture API .

После завершения HTTP-вызова на этом этапе выполняется всего несколько инструкций. Он заключает ответ в объект WSResponse .

Обычно этот результат передается на следующий этап цепочки выполнения. В этом примере мы не предоставили никакой потребляющей функции, поэтому результат потерян.

По этой причине этот запрос относится к типу «запустил и забыл».

4.3. Отправить форму

Отправка формы не сильно отличается от примера get .

Чтобы инициировать запрос, мы просто вызываем метод post :

ws.url(url)
...
.setContentType("application/x-www-form-urlencoded")
.post("key1=value1&key2=value2");

В этом сценарии нам нужно передать тело в качестве параметра. Это может быть простая строка, такая как файл, документ json или xml, BodyWritable или Source .

4.4. Отправить составные данные/данные формы

Составная форма требует, чтобы мы отправляли как поля ввода, так и данные из вложенного файла или потока.

Чтобы реализовать это во фреймворке, мы используем метод post с Source .

Внутри источника мы можем обернуть все различные типы данных, необходимые для нашей формы:

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file =
new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
.post(Source.from(Arrays.asList(file, data)));

Хотя этот подход добавляет некоторые дополнительные настройки, он по-прежнему очень похож на другие типы запросов.

5. Обработайте асинхронный ответ

До этого момента мы запускали только запросы типа «выстрелил-забыл», когда наш код ничего не делал с ответными данными.

Давайте теперь рассмотрим два метода обработки асинхронного ответа.

Мы можем либо заблокировать основной поток, ожидая CompletableFuture, либо использовать асинхронно с Consumer .

5.1. Ответ процесса путем блокировки с помощью CompletableFuture

Даже при использовании асинхронной среды мы можем заблокировать выполнение нашего кода и дождаться ответа.

Используя API CompletableFuture , нам нужно всего лишь несколько изменений в нашем коде для реализации этого сценария:

WSResponse response = ws.url(url)
.get()
.toCompletableFuture()
.get();

Это может быть полезно, например, для обеспечения строгой согласованности данных, которую мы не можем достичь другими способами.

5.2. Обработать ответ асинхронно

Чтобы обработать асинхронный ответ без блокировки, мы предоставляем потребителя или функцию , которая запускается асинхронной инфраструктурой, когда ответ доступен.

Например, давайте добавим Consumer в наш предыдущий пример, чтобы регистрировать ответ:

ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + 1)
.get()
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()));

Затем мы видим ответ в журналах:

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
"Result" : "ok",
"Params" : {
"num" : [ "1" ]
},
"Headers" : {
"accept" : [ "*/*" ],
"host" : [ "localhost:19001" ],
"key" : [ "value" ],
"user-agent" : [ "AHC/2.1" ]
}
} | Current Time:1579303109613

Стоит отметить, что мы использовали thenAccept , для которого требуется функция Consumer , поскольку нам не нужно ничего возвращать после регистрации.

Когда мы хотим, чтобы текущая стадия что-то вернула, чтобы мы могли использовать это на следующей стадии, вместо этого нам нужно thenApply , который принимает функцию .

Они используют соглашения стандартных функциональных интерфейсов Java .

5.3. Большое тело ответа

Код, который мы реализовали до сих пор, является хорошим решением для небольших ответов и большинства вариантов использования. Однако если нам нужно обработать несколько сотен мегабайт данных, нам понадобится лучшая стратегия.

Следует отметить: методы запроса, такие как get и post , загружают весь ответ в память.

Чтобы избежать возможной ошибки OutOfMemoryError , мы можем использовать Akka Streams для обработки ответа, не позволяя ему заполнить нашу память.

Например, мы можем записать его тело в файл:

ws.url(url)
.stream()
.thenAccept(
response -> {
try {
OutputStream outputStream = Files.newOutputStream(path);
Sink<ByteString, CompletionStage<Done>> outputWriter =
Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
response.getBodyAsSource().runWith(outputWriter, materializer);
} catch (IOException e) {
log.error("An error happened while opening the output stream", e);
}
});

Метод потока возвращает CompletionStage , где WSResponse имеет метод getBodyAsStream , предоставляющий Source<ByteString, ?> .

Мы можем указать коду, как обрабатывать этот тип тела, используя Akka's Sink , который в нашем примере будет просто записывать любые данные, проходящие через OutputStream .

5.4. Тайм-ауты

При создании запроса мы также можем установить определенный тайм-аут, поэтому запрос прерывается, если мы не получаем полный ответ вовремя.

Это особенно полезная функция, когда мы видим, что служба, которую мы запрашиваем, работает особенно медленно и может привести к скоплению открытых соединений, зависших в ожидании ответа.

Мы можем установить глобальный тайм-аут для всех наших запросов, используя параметры настройки. Для тайм-аута для конкретного запроса мы можем добавить к запросу, используя setRequestTimeout :

ws.url(url)
.setRequestTimeout(Duration.of(1, SECONDS));

Однако есть еще один случай: мы можем получить все данные, но наш Потребитель может очень медленно их обрабатывать. Это может произойти, если есть много обработки данных, вызовов базы данных и т. д.

В системах с низкой пропускной способностью мы можем просто позволить коду работать до его завершения. Однако мы можем захотеть прервать длительные действия.

Чтобы достичь этого, мы должны обернуть наш код некоторой обработкой фьючерсов .

Давайте смоделируем очень долгий процесс в нашем коде:

ws.url(url)
.get()
.thenApply(
result -> {
try {
Thread.sleep(10000L);
return Results.ok();
} catch (InterruptedException e) {
return Results.status(SERVICE_UNAVAILABLE);
}
});

Это вернет ответ OK через 10 секунд, но мы не хотим ждать так долго.

Вместо этого с помощью обертки тайм -аута мы указываем нашему коду ждать не более 1 секунды:

CompletionStage<Result> f = futures.timeout(
ws.url(url)
.get()
.thenApply(result -> {
try {
Thread.sleep(10000L);
return Results.ok();
} catch (InterruptedException e) {
return Results.status(SERVICE_UNAVAILABLE);
}
}), 1L, TimeUnit.SECONDS);

Теперь наше будущее будет возвращать результат в любом случае: результат вычисления, если Потребитель завершится вовремя, или исключение из-за тайм- аута фьючерса .

5.5. Обработка исключений

В предыдущем примере мы создали функцию, которая либо возвращает результат, либо завершается ошибкой с исключением. Итак, теперь нам нужно обработать оба сценария.

С помощью метода handleAsync мы можем обрабатывать как успешные, так и неудачные сценарии .

Допустим, мы хотим вернуть результат, если он у нас есть, или запротоколировать ошибку и вернуть исключение для дальнейшей обработки:

CompletionStage<Object> res = f.handleAsync((result, e) -> {
if (e != null) {
log.error("Exception thrown", e);
return e.getCause();
} else {
return result;
}
});

Теперь код должен возвращать CompletionStage , содержащий созданное TimeoutException .

Мы можем проверить это, просто вызвав assertEquals для класса возвращаемого объекта исключения:

Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);

При запуске теста он также зарегистрирует полученное нами исключение:

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

6. Фильтры запросов

Иногда нам нужно запустить некоторую логику, прежде чем запрос будет запущен.

Мы могли бы манипулировать объектом WSRequest после его инициализации, но более элегантным методом является установка WSRequestFilter .

Фильтр может быть установлен во время инициализации перед вызовом метода запуска и привязан к логике запроса.

Мы можем определить свой собственный фильтр, реализовав интерфейс WSRequestFilter , или можем добавить готовый.

Распространенным сценарием является регистрация того, как выглядит запрос, перед его выполнением.

В этом случае нам просто нужно установить AhcCurlRequestLogger :

ws.url(url)
...
.setRequestFilter(new AhcCurlRequestLogger())
...
.get();

Полученный журнал имеет curl - подобный формат:

[info] p.l.w.a.AhcCurlRequestLogger - curl \
--verbose \
--request GET \
--header 'key: value' \
'http://localhost:19001'

Мы можем установить желаемый уровень журнала, изменив нашу конфигурацию logback.xml .

7. Кэширование ответов

WSClient также поддерживает кэширование ответов.

Эта функция особенно полезна, когда один и тот же запрос инициируется несколько раз, и нам не нужны каждый раз самые свежие данные.

Это также помогает, когда служба, которую мы вызываем, временно не работает.

7.1. Добавить зависимости кэширования

Чтобы настроить кеширование, нам нужно сначала добавить зависимость в наш build.sbt :

libraryDependencies += ehcache

Это настраивает Ehcache в качестве нашего уровня кэширования.

Если нам не нужен именно Ehcache, мы можем использовать любую другую реализацию кэша JSR-107 .

7.2. Эвристика принудительного кэширования

По умолчанию Play WS не кэширует HTTP-ответы, если сервер не возвращает какую-либо конфигурацию кэширования.

Чтобы обойти это, мы можем принудительно применить эвристическое кэширование, добавив параметр в наш application.conf :

play.ws.cache.heuristics.enabled=true

Это настроит систему на решение, когда полезно кэшировать HTTP-ответ, независимо от объявленного кэширования удаленной службы.

8. Дополнительная настройка

Выполнение запросов к внешней службе может потребовать некоторой настройки клиента. Возможно, нам придется обрабатывать перенаправления, медленный сервер или некоторую фильтрацию в зависимости от заголовка пользовательского агента.

Чтобы решить эту проблему, мы можем настроить наш WS-клиент, используя свойства в нашем application.conf :

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# time to wait for the connection to be established
play.ws.timeout.connection=30
# time to wait for data after the connection is open
play.ws.timeout.idle=30
# max time available to complete the request
play.ws.timeout.request=300

Также можно напрямую настроить базовый AsyncHttpClient .

Полный список доступных свойств можно посмотреть в исходном коде AhcConfig .

9. Заключение

В этой статье мы изучили библиотеку Play WS и ее основные функции. Мы настроили наш проект, научились запускать общие запросы и обрабатывать их ответы как синхронно, так и асинхронно.

Мы работали с загрузками больших объемов данных и увидели, как сократить длительные операции.

Наконец, мы рассмотрели кэширование для повышения производительности и способы настройки клиента.

Как всегда, исходный код этого руководства доступен на GitHub .