Перейти к основному содержимому

Отправленные сервером события весной

· 5 мин. чтения

1. Обзор

В этом руководстве мы увидим, как мы можем реализовать API-интерфейсы на основе Server-Sent-Events с помощью Spring.

Проще говоря, Server-Sent-Events или сокращенно SSE — это стандарт HTTP, который позволяет веб-приложению обрабатывать однонаправленный поток событий и получать обновления всякий раз, когда сервер отправляет данные.

Версия Spring 4.2 уже поддерживала его, но, начиная с Spring 5, у нас появился более идиоматический и удобный способ его обработки .

2. SSE с Spring 5 Webflux

Для этого мы можем использовать такие реализации, как класс Flux , предоставляемый библиотекой Reactor , или потенциально сущность ServerSentEvent , которая дает нам контроль над метаданными событий.

2.1. Потоковые события с использованием Flux

Flux — это реактивное представление потока событий — он обрабатывается по-разному в зависимости от указанного типа носителя запроса или ответа.

Чтобы создать конечную точку потоковой передачи SSE, мы должны следовать спецификациям W3C и обозначить ее MIME-тип как text/event-stream :

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}

Интервальный метод создает Flux , который постепенно выдает длинные значения. Затем мы сопоставляем эти значения с желаемым результатом.

Давайте запустим наше приложение и попробуем его, просмотрев конечную точку.

Посмотрим, как браузер отреагирует на события, отправляемые сервером каждую секунду. Для получения дополнительной информации о Flux и Reactor Core , мы можем проверить этот пост .

2.2. Использование элемента ServerSentEvent

Теперь мы завернем нашу выходную строку в объект ServerSentSevent и рассмотрим преимущества этого:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}

Как мы можем оценить, есть несколько преимуществ использования объекта ServerSentEvent :

  1. мы можем обрабатывать метаданные событий, которые нам понадобятся в реальном сценарии
  2. мы можем игнорировать объявление типа мультимедиа « text/event-stream »

В данном случае мы указали id , название события и, самое главное, собственно данные события.

Кроме того, мы могли бы добавить атрибут комментариев и значение повторной попытки, которое будет указывать время повторного подключения, которое будет использоваться при попытке отправить событие.

2.3. Использование событий, отправленных сервером, с помощью веб-клиента

Теперь давайте воспользуемся нашим потоком событий с помощью WebClient .:

public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};

Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);

eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}

Метод подписки позволяет нам указать, как мы будем действовать, когда мы успешно получим событие, когда произойдет ошибка и когда потоковая передача будет завершена.

В нашем примере мы использовали метод извлечения , который представляет собой простой и понятный способ получения тела ответа.

Этот метод автоматически генерирует исключение WebClientResponseException , если мы получаем ответ 4xx или 5xx, если мы не обрабатываем сценарии, добавляя оператор onStatus .

С другой стороны, мы могли бы также использовать метод обмена , который обеспечивает доступ к ClientResponse , а также не сигнализирует об ошибках при неудачных ответах.

Мы должны учитывать, что мы можем обойти оболочку ServerSentEvent , если нам не нужны метаданные события.

3. Потоковая передача SSE в Spring MVC

Как мы уже говорили, спецификация SSE поддерживалась с версии Spring 4.2, когда был представлен класс SseEmitter .

Проще говоря, мы определим ExecutorService , поток, в котором SseEmitter будет выполнять свою работу по отправке данных, и возвращать экземпляр эмиттера, сохраняя соединение открытым следующим образом:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}

Всегда выбирайте правильный ExecutorService для своего сценария использования.

Мы можем узнать больше о SSE в Spring MVC и посмотреть другие примеры, прочитав этот интересный учебник .

4. Понимание событий, отправленных сервером

Теперь, когда мы знаем, как реализовать конечные точки SSE, давайте попробуем немного углубиться, разобравшись в некоторых основных концепциях.

SSE — это спецификация, принятая большинством браузеров для однонаправленной потоковой передачи событий в любое время.

«События» — это просто поток текстовых данных в кодировке UTF-8, которые следуют формату, определенному спецификацией.

Этот формат состоит из ряда элементов ключ-значение (идентификатор, повторная попытка, данные и событие, которое указывает имя), разделенных разрывами строк.

Комментарии также поддерживаются.

Спецификация никоим образом не ограничивает формат полезных данных; мы можем использовать простую строку или более сложную структуру JSON или XML.

Последний момент, который мы должны принять во внимание, — это разница между использованием потоковой передачи SSE и WebSockets .

В то время как WebSockets предлагают полнодуплексную (двунаправленную) связь между сервером и клиентом, а SSE использует однонаправленную связь.

Кроме того, WebSockets не является протоколом HTTP и, в отличие от SSE, не предлагает стандартов обработки ошибок.

5. Вывод

Подводя итог, в этой статье мы изучили основные концепции потоковой передачи SSE, которая, несомненно, является отличным ресурсом, который позволит нам создавать системы следующего поколения.

Теперь у нас есть отличная возможность понять, что происходит под капотом, когда мы используем этот протокол.

Кроме того, мы дополнили теорию несколькими простыми примерами, которые можно найти в нашем репозитории на Github .