1. Обзор
В этом руководстве мы увидим, как мы можем реализовать API-интерфейсы на основе Server-Sent-Events с помощью Spring.
Проще говоря, Server-Sent-Events или сокращенно SSE — это стандарт HTTP, который позволяет веб-приложению обрабатывать однонаправленный поток событий и получать обновления всякий раз, когда сервер отправляет данные.
Версия Spring 4.2 уже поддерживала его, но, начиная с Spring 5, у нас появился более идиоматический и удобный способ его обработки .
2. SSE с Spring 5 Webflux
Для этого мы можем использовать такие реализации, как класс Flux
, предоставляемый библиотекой Reactor
, или потенциально сущность ServerSentEvent
, которая дает нам контроль над метаданными событий.
2.1. Потоковые события с использованием Flux
Flux
— это реактивное представление потока событий — он обрабатывается по-разному в зависимости от указанного типа носителя запроса или ответа.
Чтобы создать конечную точку потоковой передачи SSE, мы должны следовать спецификациям W3C и обозначить ее MIME-тип как text/event-stream
:
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
Интервальный метод создает Flux , который
постепенно
выдает длинные
значения. Затем мы сопоставляем эти значения с желаемым результатом.
Давайте запустим наше приложение и попробуем его, просмотрев конечную точку.
Посмотрим, как браузер отреагирует на события, отправляемые сервером каждую секунду. Для получения дополнительной информации о Flux
и Reactor Core
, мы можем проверить этот пост .
2.2. Использование элемента ServerSentEvent
Теперь мы завернем нашу выходную строку
в объект ServerSentSevent
и рассмотрим преимущества этого:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
Как мы можем оценить, есть несколько преимуществ использования объекта ServerSentEvent
:
- мы можем обрабатывать метаданные событий, которые нам понадобятся в реальном сценарии
- мы можем игнорировать объявление типа мультимедиа «
text/event-stream »
В данном случае мы указали id
, название события
и, самое главное, собственно данные
события.
Кроме того, мы могли бы добавить атрибут комментариев
и значение повторной
попытки, которое будет указывать время повторного подключения, которое будет использоваться при попытке отправить событие.
2.3. Использование событий, отправленных сервером, с помощью веб-клиента
Теперь давайте воспользуемся нашим потоком событий с помощью WebClient
.:
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
Метод подписки
позволяет нам указать, как мы будем действовать, когда мы успешно получим событие, когда произойдет ошибка и когда потоковая передача будет завершена.
В нашем примере мы использовали метод извлечения
, который представляет собой простой и понятный способ получения тела ответа.
Этот метод автоматически генерирует исключение WebClientResponseException
, если мы получаем ответ 4xx или 5xx, если мы не обрабатываем сценарии, добавляя оператор onStatus
.
С другой стороны, мы могли бы также использовать метод обмена
, который обеспечивает доступ к ClientResponse
, а также не сигнализирует об ошибках при неудачных ответах.
Мы должны учитывать, что мы можем обойти оболочку ServerSentEvent
, если нам не нужны метаданные события.
3. Потоковая передача SSE в Spring MVC
Как мы уже говорили, спецификация SSE поддерживалась с версии Spring 4.2, когда был представлен класс SseEmitter .
Проще говоря, мы определим ExecutorService
, поток, в котором SseEmitter
будет выполнять свою работу по отправке данных, и возвращать экземпляр эмиттера, сохраняя соединение открытым следующим образом:
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
Всегда выбирайте правильный ExecutorService
для своего сценария использования.
Мы можем узнать больше о SSE в Spring MVC и посмотреть другие примеры, прочитав этот интересный учебник .
4. Понимание событий, отправленных сервером
Теперь, когда мы знаем, как реализовать конечные точки SSE, давайте попробуем немного углубиться, разобравшись в некоторых основных концепциях.
SSE — это спецификация, принятая большинством браузеров для однонаправленной потоковой передачи событий в любое время.
«События» — это просто поток текстовых данных в кодировке UTF-8, которые следуют формату, определенному спецификацией.
Этот формат состоит из ряда элементов ключ-значение (идентификатор, повторная попытка, данные и событие, которое указывает имя), разделенных разрывами строк.
Комментарии также поддерживаются.
Спецификация никоим образом не ограничивает формат полезных данных; мы можем использовать простую строку
или более сложную структуру JSON или XML.
Последний момент, который мы должны принять во внимание, — это разница между использованием потоковой передачи SSE и WebSockets
.
В то время как WebSockets
предлагают полнодуплексную (двунаправленную) связь между сервером и клиентом, а SSE использует однонаправленную связь.
Кроме того, WebSockets
не является протоколом HTTP и, в отличие от SSE, не предлагает стандартов обработки ошибок.
5. Вывод
Подводя итог, в этой статье мы изучили основные концепции потоковой передачи SSE, которая, несомненно, является отличным ресурсом, который позволит нам создавать системы следующего поколения.
Теперь у нас есть отличная возможность понять, что происходит под капотом, когда мы используем этот протокол.
Кроме того, мы дополнили теорию несколькими простыми примерами, которые можно найти в нашем репозитории на Github .