1. Обзор
Server-Sent Events (SSE) — это спецификация, основанная на HTTP, которая позволяет установить продолжительное одноканальное соединение между сервером и клиентом.
Клиент инициирует соединение SSE, используя текст/поток событий
типа мультимедиа в заголовке Accept .
Позже он обновляется автоматически, не запрашивая сервер.
Мы можем проверить более подробную информацию о спецификации на официальной спецификации .
В этом руководстве мы познакомим вас с новой реализацией SSE JAX-RS 2.1.
Следовательно, мы рассмотрим, как мы можем публиковать события с помощью JAX-RS Server API. Кроме того, мы рассмотрим, как мы можем использовать их либо с помощью клиентского API JAX-RS, либо просто с помощью HTTP-клиента, такого как инструмент curl .
2. Понимание событий SSE
Событие SSE — это блок текста, состоящий из следующих полей:
Событие:
тип события. Сервер может отправлять много сообщений разных типов, а клиент может прослушивать только сообщения определенного типа или обрабатывать по-разному каждый тип события.Данные:
сообщение, отправленное сервером. У нас может быть много строк данных для одного и того же события.Id:
идентификатор события, используемый для отправки заголовкаLast-Event-ID
после повторной попытки подключения. Это полезно, так как может помешать серверу отправлять уже отправленные события.Retry:
время в миллисекундах, в течение которого клиент устанавливает новое соединение при потере текущего. Последний полученный идентификатор будет автоматически отправлен через заголовокLast-Event-ID .
- '
:
': это комментарий, который клиент игнорирует
Кроме того, два последовательных события разделяются двойной новой строкой ' \n\n
'.
Кроме того, данные в одном и том же событии могут быть записаны в несколько строк, как показано в следующем примере:
event: stock
id: 1
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":1,
data: "name":"GOOG","price":75.7119}
event: stock
id: 2
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}
В JAX RS
событие SSE абстрагируется интерфейсом SseEvent или
,
точнее, двумя субинтерфейсами OutboundSseEvent
и InboundSseEvent.
В то время как OutboundSseEvent
используется в API-интерфейсе сервера и разрабатывает отправленное событие, InboundSseEvent
используется API-интерфейсом клиента и абстрагирует полученное событие .
3. Публикация событий SSE
Теперь, когда мы обсудили, что такое событие SSE, давайте посмотрим, как мы можем создать и отправить его клиенту HTTP.
3.1. Настройка проекта
У нас уже есть руководство по настройке проекта Maven на основе JAX RS. Не стесняйтесь заглянуть туда, чтобы узнать, как установить зависимости и начать работу с JAX RS.
3.2. Метод ресурса SSE
Метод SSE Resource — это метод JAX RS, который:
- Может создавать тип мультимедиа
text/event-stream
- Имеет введенный параметр
SseEventSink
, куда отправляются события - Может также иметь введенный параметр
Sse
, который используется в качестве точки входа для создания построителя событий.
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) {
//...
}
Как следствие, клиент должен сделать первый запрос HTTP со следующим заголовком HTTP:
Accept: text/event-stream
3.3. Экземпляр SSE
Экземпляр SSE — это компонент контекста, который JAX RS Runtime сделает доступным для внедрения.
Мы могли бы использовать его как фабрику для создания:
OutboundSseEvent.Builder —
позволяет нам создавать события, а затемSseBroadcaster —
позволяет нам транслировать события нескольким подписчикам
Давайте посмотрим, как это работает:
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.eventBuilder = sse.newEventBuilder();
this.sseBroadcaster = sse.newBroadcaster();
}
Теперь давайте сосредоточимся на построителе событий. OutboundSseEvent.Builder
отвечает за создание OutboundSseEvent
:
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(4000)
.comment("price change")
.build();
Как мы видим, у построителя есть методы для установки значений для всех полей событий, показанных выше . Кроме того, метод mediaType()
используется для сериализации объекта Java поля данных в подходящий текстовый формат.
По умолчанию тип носителя для поля данных — text/plain
. Следовательно, его не нужно указывать явно при работе с типом данных String .
В противном случае, если мы хотим обработать пользовательский объект, нам нужно указать тип носителя или предоставить пользовательский MessageBodyWriter.
JAX RS Runtime предоставляет MessageBodyWriters
для наиболее известных типов мультимедиа .
Экземпляр Sse также имеет два ярлыка построителей для создания события только с полем данных или с полями типа и данных:
OutboundSseEvent sseEvent = sse.newEvent("cool Event");
OutboundSseEvent sseEvent = sse.newEvent("typed event", "data Event");
3.4. Отправка простого события
Теперь, когда мы знаем, как создавать события, и понимаем, как работает ресурс SSE. Отправим простое событие.
Интерфейс SseEventSink
абстрагирует одно HTTP-соединение. Среда выполнения JAX-RS может сделать его доступным только путем внедрения в метод ресурса SSE.
Отправка события так же проста, как вызов SseEventSink.
Отправить().
В следующем примере отправим кучу стоковых обновлений и в итоге закроем поток событий:
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink /*..*/) {
int lastEventId = //..;
while (running) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseEventSink.send(sseEvent);
lastEventId++;
}
//..
}
sseEventSink.close();
}
После отправки всех событий сервер закрывает соединение, либо явно вызывая метод close()
, либо, что предпочтительнее, используя try-with-resource,
поскольку SseEventSink
расширяет интерфейс AutoClosable
:
try (SseEventSink sink = sseEventSink) {
OutboundSseEvent sseEvent = //..
sink.send(sseEvent);
}
В нашем примере приложения мы можем увидеть, как это работает, если мы посетим:
http://localhost:9080/sse-jaxrs-server/sse.html
3.5. Трансляции событий
Широковещательная рассылка — это процесс, посредством которого события одновременно отправляются нескольким клиентам. Это достигается с помощью SseBroadcaster
API и выполняется в три простых шага:
Во-первых, мы создаем объект SseBroadcaster
из внедренного контекста Sse, как показано ранее:
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
Затем клиенты должны подписаться, чтобы иметь возможность получать события Sse. Обычно это делается в методе ресурса SSE, где вводится экземпляр контекста SseEventSink :
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
this.sseBroadcaster.register(sseEventSink);
}
И, наконец, мы можем инициировать публикацию события, вызвав метод Broadcast ()
:
@GET
@Path("publish")
public void broadcast() {
OutboundSseEvent sseEvent = //...;
this.sseBroadcaster.broadcast(sseEvent);
}
Это отправит одно и то же событие каждому зарегистрированному SseEventSink.
Чтобы продемонстрировать трансляцию, мы можем получить доступ к этому URL:
http://localhost:9080/sse-jaxrs-server/sse-broadcast.html
Затем мы можем запустить трансляцию, вызвав метод ресурса Broadcast():
curl -X GET http://localhost:9080/sse-jaxrs-server/sse/stock/publish
4. Использование событий SSE
Чтобы использовать событие SSE, отправленное сервером, мы можем использовать любой HTTP-клиент, но в этом руководстве мы будем использовать клиентский API JAX RS.
4.1. Клиентский API JAX RS для SSE
Чтобы начать работу с клиентским API для SSE, нам нужно предоставить зависимости для реализации клиента JAX RS.
Здесь мы будем использовать реализацию клиента Apache CXF:
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>${cxf-version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>${cxf-version}</version>
</dependency>
SseEventSource является сердцем этого API и создан на основе
WebTarget.
Мы начинаем с прослушивания входящих событий, которые абстрагируются интерфейсом InboundSseEvent
:
Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
try (SseEventSource source = SseEventSource.target(target).build()) {
source.register((inboundSseEvent) -> System.out.println(inboundSseEvent));
source.open();
}
После установления соединения зарегистрированный потребитель событий будет вызываться для каждого полученного InboundSseEvent .
Затем мы можем использовать метод readData()
для чтения исходной строки данных:
String data = inboundSseEvent.readData();
Или мы можем использовать перегруженную версию, чтобы получить десериализованный объект Java, используя подходящий тип носителя:
Stock stock = inboundSseEvent.readData(Stock.class, MediaType.Application_Json);
Здесь мы только что предоставили простой потребитель событий, который выводит входящее событие в консоль.
5. Вывод
В этом руководстве мы сосредоточились на том, как использовать события, отправленные сервером, в JAX RS 2.1. Мы предоставили пример, демонстрирующий, как отправлять события одному клиенту, а также как транслировать события нескольким клиентам.
Наконец, мы использовали эти события с помощью клиентского API JAX-RS.
Как обычно, код этого туториала можно найти на Github .