Перейти к основному содержимому

События, отправленные сервером (SSE) в JAX-RS

· 7 мин. чтения

1. Обзор

Server-Sent Events (SSE) — это спецификация, основанная на HTTP, которая позволяет установить продолжительное одноканальное соединение между сервером и клиентом.

Клиент инициирует соединение SSE, используя текст/поток событий типа мультимедиа в заголовке Accept .

Позже он обновляется автоматически, не запрашивая сервер.

Мы можем проверить более подробную информацию о спецификации на официальной спецификации .

В этом руководстве мы познакомим вас с новой реализацией SSE JAX-RS 2.1.

Следовательно, мы рассмотрим, как мы можем публиковать события с помощью JAX-RS Server API. Кроме того, мы рассмотрим, как мы можем использовать их либо с помощью клиентского API JAX-RS, либо просто с помощью HTTP-клиента, такого как инструмент curl .

2. Понимание событий SSE

Событие SSE — это блок текста, состоящий из следующих полей:

  • Событие: тип события. Сервер может отправлять много сообщений разных типов, а клиент может прослушивать только сообщения определенного типа или обрабатывать по-разному каждый тип события.
  • Данные: сообщение, отправленное сервером. У нас может быть много строк данных для одного и того же события.
  • Id: идентификатор события, используемый для отправки заголовка Last-Event-ID после повторной попытки подключения. Это полезно, так как может помешать серверу отправлять уже отправленные события.
  • Retry: время в миллисекундах, в течение которого клиент устанавливает новое соединение при потере текущего. Последний полученный идентификатор будет автоматически отправлен через заголовок Last-Event-ID .
  • ' : ': это комментарий, который клиент игнорирует

Кроме того, два последовательных события разделяются двойной новой строкой ' \n\n '.

Кроме того, данные в одном и том же событии могут быть записаны в несколько строк, как показано в следующем примере:

event: stock
id: 1
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":1,
data: "name":"GOOG","price":75.7119}

event: stock
id: 2
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}

В JAX RS событие SSE абстрагируется интерфейсом SseEvent или , точнее, двумя субинтерфейсами OutboundSseEvent и InboundSseEvent.

В то время как OutboundSseEvent используется в API-интерфейсе сервера и разрабатывает отправленное событие, InboundSseEvent используется API-интерфейсом клиента и абстрагирует полученное событие .

3. Публикация событий SSE

Теперь, когда мы обсудили, что такое событие SSE, давайте посмотрим, как мы можем создать и отправить его клиенту HTTP.

3.1. Настройка проекта

У нас уже есть руководство по настройке проекта Maven на основе JAX RS. Не стесняйтесь заглянуть туда, чтобы узнать, как установить зависимости и начать работу с JAX RS.

3.2. Метод ресурса SSE

Метод SSE Resource — это метод JAX RS, который:

  • Может создавать тип мультимедиа text/event-stream
  • Имеет введенный параметр SseEventSink , куда отправляются события
  • Может также иметь введенный параметр Sse , который используется в качестве точки входа для создания построителя событий.
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) {
//...
}

Как следствие, клиент должен сделать первый запрос HTTP со следующим заголовком HTTP:

Accept: text/event-stream

3.3. Экземпляр SSE

Экземпляр SSE — это компонент контекста, который JAX RS Runtime сделает доступным для внедрения.

Мы могли бы использовать его как фабрику для создания:

  • OutboundSseEvent.Builder — позволяет нам создавать события, а затем
  • SseBroadcaster — позволяет нам транслировать события нескольким подписчикам

Давайте посмотрим, как это работает:

@Context
public void setSse(Sse sse) {
this.sse = sse;
this.eventBuilder = sse.newEventBuilder();
this.sseBroadcaster = sse.newBroadcaster();
}

Теперь давайте сосредоточимся на построителе событий. OutboundSseEvent.Builder отвечает за создание OutboundSseEvent :

OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(4000)
.comment("price change")
.build();

Как мы видим, у построителя есть методы для установки значений для всех полей событий, показанных выше . Кроме того, метод mediaType() используется для сериализации объекта Java поля данных в подходящий текстовый формат.

По умолчанию тип носителя для поля данных — text/plain . Следовательно, его не нужно указывать явно при работе с типом данных String .

В противном случае, если мы хотим обработать пользовательский объект, нам нужно указать тип носителя или предоставить пользовательский MessageBodyWriter. JAX RS Runtime предоставляет MessageBodyWriters для наиболее известных типов мультимедиа .

Экземпляр Sse также имеет два ярлыка построителей для создания события только с полем данных или с полями типа и данных:

OutboundSseEvent sseEvent = sse.newEvent("cool Event");
OutboundSseEvent sseEvent = sse.newEvent("typed event", "data Event");

3.4. Отправка простого события

Теперь, когда мы знаем, как создавать события, и понимаем, как работает ресурс SSE. Отправим простое событие.

Интерфейс SseEventSink абстрагирует одно HTTP-соединение. Среда выполнения JAX-RS может сделать его доступным только путем внедрения в метод ресурса SSE.

Отправка события так же проста, как вызов SseEventSink. Отправить().

В следующем примере отправим кучу стоковых обновлений и в итоге закроем поток событий:

@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink /*..*/) {
int lastEventId = //..;
while (running) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseEventSink.send(sseEvent);
lastEventId++;
}
//..
}
sseEventSink.close();
}

После отправки всех событий сервер закрывает соединение, либо явно вызывая метод close() , либо, что предпочтительнее, используя try-with-resource, поскольку SseEventSink расширяет интерфейс AutoClosable :

try (SseEventSink sink = sseEventSink) {
OutboundSseEvent sseEvent = //..
sink.send(sseEvent);
}

В нашем примере приложения мы можем увидеть, как это работает, если мы посетим:

http://localhost:9080/sse-jaxrs-server/sse.html

3.5. Трансляции событий

Широковещательная рассылка — это процесс, посредством которого события одновременно отправляются нескольким клиентам. Это достигается с помощью SseBroadcaster API и выполняется в три простых шага:

Во-первых, мы создаем объект SseBroadcaster из внедренного контекста Sse, как показано ранее:

SseBroadcaster sseBroadcaster = sse.newBroadcaster();

Затем клиенты должны подписаться, чтобы иметь возможность получать события Sse. Обычно это делается в методе ресурса SSE, где вводится экземпляр контекста SseEventSink :

@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
this.sseBroadcaster.register(sseEventSink);
}

И, наконец, мы можем инициировать публикацию события, вызвав метод Broadcast () :

@GET
@Path("publish")
public void broadcast() {
OutboundSseEvent sseEvent = //...;
this.sseBroadcaster.broadcast(sseEvent);
}

Это отправит одно и то же событие каждому зарегистрированному SseEventSink.

Чтобы продемонстрировать трансляцию, мы можем получить доступ к этому URL:

http://localhost:9080/sse-jaxrs-server/sse-broadcast.html

Затем мы можем запустить трансляцию, вызвав метод ресурса Broadcast():

curl -X GET http://localhost:9080/sse-jaxrs-server/sse/stock/publish

4. Использование событий SSE

Чтобы использовать событие SSE, отправленное сервером, мы можем использовать любой HTTP-клиент, но в этом руководстве мы будем использовать клиентский API JAX RS.

4.1. Клиентский API JAX RS для SSE

Чтобы начать работу с клиентским API для SSE, нам нужно предоставить зависимости для реализации клиента JAX RS.

Здесь мы будем использовать реализацию клиента Apache CXF:

<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>${cxf-version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>${cxf-version}</version>
</dependency>

SseEventSource является сердцем этого API и создан на основе WebTarget.

Мы начинаем с прослушивания входящих событий, которые абстрагируются интерфейсом InboundSseEvent :

Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
try (SseEventSource source = SseEventSource.target(target).build()) {
source.register((inboundSseEvent) -> System.out.println(inboundSseEvent));
source.open();
}

После установления соединения зарегистрированный потребитель событий будет вызываться для каждого полученного InboundSseEvent .

Затем мы можем использовать метод readData() для чтения исходной строки данных:

String data = inboundSseEvent.readData();

Или мы можем использовать перегруженную версию, чтобы получить десериализованный объект Java, используя подходящий тип носителя:

Stock stock = inboundSseEvent.readData(Stock.class, MediaType.Application_Json);

Здесь мы только что предоставили простой потребитель событий, который выводит входящее событие в консоль.

5. Вывод

В этом руководстве мы сосредоточились на том, как использовать события, отправленные сервером, в JAX RS 2.1. Мы предоставили пример, демонстрирующий, как отправлять события одному клиенту, а также как транслировать события нескольким клиентам.

Наконец, мы использовали эти события с помощью клиентского API JAX-RS.

Как обычно, код этого туториала можно найти на Github .