Перейти к основному содержимому

WebSockets с Play Framework и Akka

· 10 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Обзор

Когда мы хотим, чтобы наши веб-клиенты поддерживали диалог с нашим сервером, WebSockets может быть полезным решением. WebSockets поддерживают постоянное полнодуплексное соединение. Это дает нам возможность отправлять двунаправленные сообщения между нашим сервером и клиентом.

В этом руководстве мы узнаем, как использовать WebSockets с Akka в Play Framework .

2. Настройка

Давайте настроим простое приложение для чата. Пользователь будет отправлять сообщения на сервер, а сервер ответит сообщением из JSONPlaceholder .

2.1. Настройка приложения Play Framework

Мы создадим это приложение, используя Play Framework.

Давайте следуем инструкциям из раздела Введение в Play на Java , чтобы настроить и запустить простое приложение Play Framework.

2.2. Добавление необходимых файлов JavaScript

Кроме того, нам нужно будет работать с JavaScript для сценариев на стороне клиента. Это позволит нам получать новые сообщения, отправленные с сервера. Для этого мы будем использовать библиотеку jQuery .

Давайте добавим jQuery в конец файла app/views/ index.scala.html :

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. Настройка Акка

Наконец, мы будем использовать Akka для обработки соединений WebSocket на стороне сервера.

Давайте перейдем к файлу build.sbt и добавим зависимости.

Нам нужно добавить зависимости akka-actor и akka-testkit :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Они нужны нам, чтобы иметь возможность использовать и тестировать код Akka Framework.

Далее мы будем использовать потоки Akka. Итак, добавим зависимость akka-stream :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Наконец, нам нужно вызвать конечную точку отдыха из актора Akka. Для этого нам понадобится зависимость akka-http . Когда мы это сделаем, конечная точка вернет данные JSON, которые нам придется десериализовать, поэтому нам также нужно добавить зависимость akka-http-jackson :

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

Теперь все готово. Давайте посмотрим, как заставить работать WebSockets!

3. Обработка веб-сокетов с актерами Akka

Механизм обработки WebSocket в Play построен вокруг потоков Akka. WebSocket моделируется как поток. Таким образом, входящие сообщения WebSocket передаются в поток, а сообщения, созданные потоком, отправляются клиенту.

Для обработки WebSocket с помощью Актера нам понадобится утилита Play ActorFlow , которая преобразует ActorRef в поток. В основном для этого требуется некоторый код Java с небольшой настройкой.

3.1. Метод контроллера WebSocket

Во-первых, нам нужен экземпляр Materializer . Materializer — это фабрика для двигателей потокового исполнения.

Нам нужно внедрить ActorSystem и Materializer в контроллер app/controllers/HomeController.java :

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(
ActorSystem actorSystem, Materializer materializer) {
this.actorSystem = actorSystem;
this.materializer = materializer;
}

Теперь добавим метод контроллера сокета:

public WebSocket socket() {
return WebSocket.Json
.acceptOrResult(this::createActorFlow);
}

Здесь мы вызываем функцию acceptOrResult , которая принимает заголовок запроса и возвращает будущее. Возвращаемое будущее — это поток для обработки сообщений WebSocket.

Вместо этого мы можем отклонить запрос и вернуть результат отклонения.

Теперь давайте создадим поток:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
createActorFlow(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
F.Either.Right(createFlowForActor()));
}

Класс F в Play Framework определяет набор помощников в стиле функционального программирования. В этом случае мы используем F. Someone.Right , чтобы принять соединение и вернуть поток.

Допустим, мы хотели отклонить соединение, когда клиент не аутентифицирован.

Для этого мы могли бы проверить, установлено ли имя пользователя в сеансе. И если это не так, мы отклоняем соединение с HTTP 403 Forbidden:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
createActorFlow2(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
request.session()
.getOptional("username")
.map(username ->
F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
createFlowForActor()))
.orElseGet(() -> F.Either.Left(forbidden())));
}

Мы используем F.Either.Left , чтобы отклонить соединение так же, как мы предоставляем поток с F.Either.Right .

Наконец, мы связываем поток с актором, который будет обрабатывать сообщения:

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
return ActorFlow.actorRef(out -> Messenger.props(out),
actorSystem, materializer);
}

ActorFlow.actorRef создает поток, который обрабатывается актором Messenger .

3.2. Файл маршрутов _

Теперь давайте добавим определения маршрутов для методов контроллера в conf/routes :

GET  /                    controllers.HomeController.index(request: Request)
GET /chat controllers.HomeController.socket
GET /chat/with/streams controllers.HomeController.akkaStreamsSocket
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

Эти определения маршрута сопоставляют входящие HTTP-запросы с методами действия контроллера, как описано в разделе «Маршрутизация в приложениях Play на Java» .

3.3. Реализация актера

Наиболее важной частью класса актора является метод createReceive , который определяет, какие сообщения может обрабатывать актор:

@Override
public Receive createReceive() {
return receiveBuilder()
.match(JsonNode.class, this::onSendMessage)
.matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
.build();
}

Актер будет пересылать все сообщения, соответствующие классу JsonNode , в метод обработчика onSendMessage :

private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
//..
processMessage(requestDTO);
}

Затем обработчик будет отвечать на каждое сообщение с помощью метода processMessage :

private void processMessage(RequestDTO requestDTO) {
CompletionStage<HttpResponse> responseFuture = getRandomMessage();
responseFuture.thenCompose(this::consumeHttpResponse)
.thenAccept(messageDTO ->
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. Использование Rest API с Akka HTTP

Мы будем отправлять HTTP-запросы фиктивному генератору сообщений в JSONPlaceholder Posts . Когда приходит ответ, мы отправляем ответ клиенту, записывая его .

Давайте создадим метод, который вызывает конечную точку со случайным идентификатором сообщения:

private CompletionStage<HttpResponse> getRandomMessage() {
int postId = ThreadLocalRandom.current().nextInt(0, 100);
return Http.get(getContext().getSystem())
.singleRequest(HttpRequest.create(
"https://jsonplaceholder.typicode.com/posts/" + postId));
}

Мы также обрабатываем HttpResponse , полученный при вызове службы, чтобы получить ответ JSON:

private CompletionStage<MessageDTO> consumeHttpResponse(
HttpResponse httpResponse) {
Materializer materializer =
Materializer.matFromSystem(getContext().getSystem());
return Jackson.unmarshaller(MessageDTO.class)
.unmarshal(httpResponse.entity(), materializer)
.thenApply(messageDTO -> {
log.info("Received message: {}", messageDTO);
discardEntity(httpResponse, materializer);
return messageDTO;
});
}

Класс MessageConverter — это утилита для преобразования между JsonNode и DTO:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(jsonNode, MessageDTO.class);
}

Далее нам нужно отбросить сущность . Удобный метод discardEntityBytes служит для простого удаления объекта, если он не имеет для нас никакой цели .

Давайте посмотрим, как отбросить байты:

private void discardEntity(
HttpResponse httpResponse, Materializer materializer) {
HttpMessage.DiscardedEntity discarded =
httpResponse.discardEntityBytes(materializer);
discarded.completionStage()
.whenComplete((done, ex) ->
log.info("Entity discarded completely!"));
}

Теперь, выполнив обработку WebSocket, давайте посмотрим, как мы можем настроить клиент для этого с помощью WebSockets HTML5.

4. Настройка клиента WebSocket

Для нашего клиента давайте создадим простое веб-приложение для чата.

4.1. Действие контроллера

Нам нужно определить действие контроллера, которое отображает индексную страницу. Мы поместим это в класс контроллера app.controllers.HomeController :

public Result index(Http.Request request) {
String url = routes.HomeController.socket()
.webSocketURL(request);
return ok(views.html.index.render(url));
}

4.2. Страница шаблона

Теперь давайте перейдем на страницу app/views/ndex.scala.html и добавим контейнер для полученных сообщений и форму для захвата нового сообщения:

<div id="messageContent"></div>F
<form>
<textarea id="messageInput"></textarea>
<button id="sendButton">Send</button>
</form>

Нам также потребуется передать URL-адрес действия контроллера WebSocket, объявив этот параметр в верхней части страницы app/views/index.scala.html `` :

@(url: String)

4.3. Обработчики событий WebSocket в JavaScript

И теперь мы можем добавить JavaScript для обработки событий WebSocket. Для простоты мы добавим функции JavaScript внизу страницы app/views/index.scala.html .

Объявим обработчики событий:

var webSocket;
var messageInput;

function init() {
initWebSocket();
}

function initWebSocket() {
webSocket = new WebSocket("@url");
webSocket.onopen = onOpen;
webSocket.onclose = onClose;
webSocket.onmessage = onMessage;
webSocket.onerror = onError;
}

Добавим сами обработчики:

function onOpen(evt) {
writeToScreen("CONNECTED");
}

function onClose(evt) {
writeToScreen("DISCONNECTED");
}

function onError(evt) {
writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
var receivedData = JSON.parse(evt.data);
appendMessageToView("Server", receivedData.body);
}

Затем, чтобы представить вывод, мы будем использовать функции appendMessageToView и writeToScreen :

function appendMessageToView(title, message) {
$("#messageContent").append("<p>" + title + ": " + message + "</p>");
}

function writeToScreen(message) {
console.log("New message: ", message);
}

4.4. Запуск и тестирование приложения

Мы готовы протестировать приложение, поэтому давайте запустим его:

cd websockets
sbt run

Когда приложение запущено, мы можем общаться с сервером, посетив http://localhost:9000 :

./1b5d140b0eea5c69e348d71beeca9700.png

Каждый раз, когда мы набираем сообщение и нажимаем « Отправить », сервер немедленно отвечает некоторым lorem ipsum из службы JSON Placeholder.

5. Обработка веб-сокетов напрямую с помощью Akka Streams

Если мы обрабатываем поток событий из источника и отправляем их клиенту, то мы можем смоделировать это на основе потоков Akka.

Давайте посмотрим, как мы можем использовать потоки Akka на примере, когда сервер отправляет сообщения каждые две секунды.

Начнем с действия WebSocket в HomeController :

public WebSocket akkaStreamsSocket() {
return WebSocket.Json.accept(request -> {
Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
MessageDTO messageDTO =
new MessageDTO("1", "1", "Title", "Test Body");
Source<JsonNode, ?> out = Source.tick(
Duration.ofSeconds(2),
Duration.ofSeconds(2),
MessageConverter.messageToJsonNode(messageDTO)
);
return Flow.fromSinkAndSource(in, out);
});
}

Метод тика Source# принимает три параметра. Первая — начальная задержка перед обработкой первого тика, а вторая — интервал между последовательными тиками. Мы установили оба значения на две секунды в приведенном выше фрагменте. Третий параметр — это объект, который должен возвращаться на каждом тике. ``

Чтобы увидеть это в действии, нам нужно изменить URL-адрес в действии index и сделать так, чтобы он указывал на конечную точку akkaStreamsSocket :

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

А теперь, обновляя страницу, мы будем видеть новую запись каждые две секунды:

./66435fe501efe956a8039c3815711e01.png

6. Увольнение актера

В какой-то момент нам нужно будет закрыть чат либо по запросу пользователя, либо по тайм-ауту.

6.1. Обработка прекращения действия актера

Как определить, что WebSocket был закрыт?

Play автоматически закроет WebSocket, когда актор, обрабатывающий WebSocket, завершится. Таким образом, мы можем справиться с этим сценарием, реализуя метод Actor#postStop :

@Override
public void postStop() throws Exception {
log.info("Messenger actor stopped at {}",
OffsetDateTime.now()
.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. Завершение актора вручную

Кроме того, если нам нужно остановить актера, мы можем послать ему PoisonPill . В нашем примерном приложении мы должны иметь возможность обрабатывать запрос «стоп».

Давайте посмотрим, как это сделать в методе onSendMessage :

private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
if("stop".equals(message)) {
MessageDTO messageDTO =
createMessageDTO("1", "1", "Stop", "Stopping actor");
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
self().tell(PoisonPill.getInstance(), getSelf());
} else {
log.info("Actor received. {}", requestDTO);
processMessage(requestDTO);
}
}

Когда мы получаем сообщение, мы проверяем, является ли это запросом на остановку. Если это так, мы посылаем PoisonPill . В противном случае мы обрабатываем запрос.

7. Параметры конфигурации

Мы можем настроить несколько параметров с точки зрения того, как следует обрабатывать WebSocket. Давайте посмотрим на несколько.

7.1. Длина кадра веб-сокета

Связь WebSocket включает обмен фреймами данных.

Длина кадра WebSocket настраивается. У нас есть возможность отрегулировать длину рамы в соответствии с требованиями нашего приложения.

Настройка более короткой длины кадра может помочь снизить количество атак типа «отказ в обслуживании», в которых используются длинные кадры данных. Мы можем изменить длину кадра для приложения, указав максимальную длину в application.conf :

play.server.websocket.frame.maxLength = 64k

Мы также можем установить этот параметр конфигурации, указав максимальную длину в качестве параметра командной строки:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Тайм-аут простоя соединения

По умолчанию актор, который мы используем для обработки WebSocket, завершается через одну минуту. Это связано с тем, что сервер Play, на котором работает наше приложение, имеет тайм-аут простоя по умолчанию, равный 60 секундам. Это означает, что все соединения, которые не получают запрос в течение шестидесяти секунд, закрываются автоматически.

Мы можем изменить это с помощью параметров конфигурации. Давайте перейдем к нашему application.conf и изменим сервер, чтобы он не имел таймаута простоя:

play.server.http.idleTimeout = "infinite"

Или мы можем передать параметр в качестве аргументов командной строки:

sbt -Dhttp.idleTimeout=infinite run

Мы также можем настроить это, указав devSettings в build.sbt .

Параметры конфигурации, указанные в build.sbt , используются только в разработке, в продакшене они будут игнорироваться:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Если мы перезапустим приложение, актор не завершится.

Мы можем изменить значение на секунды:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Подробнее о доступных параметрах конфигурации мы можем узнать в документации Play Framework .

8. Заключение

В этом руководстве мы реализовали WebSockets в Play Framework с актерами Akka и потоками Akka.

Затем мы рассмотрели, как напрямую использовать акторы Akka, а затем увидели, как можно настроить Akka Streams для обработки соединения WebSocket.

На стороне клиента мы использовали JavaScript для обработки событий WebSocket.

Наконец, мы рассмотрели некоторые параметры конфигурации, которые мы можем использовать.

Как обычно, исходный код этого руководства доступен на GitHub .