Перейти к основному содержимому

Spring MVC Streaming и обработка запросов SSE

· 4 мин. чтения

1. Введение

В этом простом руководстве демонстрируется использование нескольких асинхронных и потоковых объектов в Spring MVC 5.xx.

В частности, мы рассмотрим три ключевых класса:

  • ResponseBodyEmitter
  • SseEmitter
  • StreamingResponseBody

Также мы обсудим, как взаимодействовать с ними с помощью клиента JavaScript.

2. ResponseBodyEmitter

ResponseBodyEmitter обрабатывает асинхронные ответы.

Кроме того, он представляет родителя для ряда подклассов, один из которых мы рассмотрим подробнее ниже.

2.1. Сторона сервера

Лучше использовать ResponseBodyEmitter вместе с собственным выделенным асинхронным потоком и обернутым ResponseEntity (в который мы можем напрямую внедрить эмиттер ):

@Controller
public class ResponseBodyEmitterController {

private ExecutorService executor
= Executors.newCachedThreadPool();

@GetMapping("/rbe")
public ResponseEntity<ResponseBodyEmitter> handleRbe() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
executor.execute(() -> {
try {
emitter.send(
"/rbe" + " @ " + new Date(), MediaType.TEXT_PLAIN);
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return new ResponseEntity(emitter, HttpStatus.OK);
}
}

Таким образом, в приведенном выше примере мы можем обойти необходимость использования CompleteableFutures , более сложных асинхронных промисов или использования аннотации @Async .

Вместо этого мы просто объявляем наш асинхронный объект и помещаем его в новый поток , предоставляемый ExecutorService.

2.2. Сторона клиента

Для использования на стороне клиента мы можем использовать простой метод XHR и вызывать конечные точки нашего API, как в обычной операции AJAX:

var xhr = function(url) {
return new Promise(function(resolve, reject) {
var xmhr = new XMLHttpRequest();
//...
xmhr.open("GET", url, true);
xmhr.send();
//...
});
};

xhr('http://localhost:8080/javamvcasync/rbe')
.then(function(success){ //... });

3. SseEmitter

SseEmitter на самом деле является подклассом ResponseBodyEmitter и предоставляет дополнительную встроенную поддержку Server-Sent Event (SSE) .

3.1. Сторона сервера

Итак, давайте быстро взглянем на пример контроллера, использующего эту мощную сущность:

@Controller
public class SseEmitterController {
private ExecutorService nonBlockingService = Executors
.newCachedThreadPool();

@GetMapping("/sse")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter();
nonBlockingService.execute(() -> {
try {
emitter.send("/sse" + " @ " + new Date());
// we could send more events
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}

Довольно стандартная плата, но мы заметим несколько отличий между этим и нашим обычным контроллером REST:

  • Сначала мы возвращаем SseEmitter
  • Кроме того, мы помещаем основную информацию об ответе в отдельный поток .
  • Наконец, мы отправляем информацию об ответе, используя emitter.send().

3.2. Сторона клиента

На этот раз наш клиент работает немного по-другому, поскольку мы можем использовать постоянно подключенную библиотеку событий, отправленных сервером :

var sse = new EventSource('http://localhost:8080/javamvcasync/sse');
sse.onmessage = function (evt) {
var el = document.getElementById('sse');
el.appendChild(document.createTextNode(evt.data));
el.appendChild(document.createElement('br'));
};

4. StreamingResponseBody

Наконец, мы можем использовать StreamingResponseBody для прямой записи в OutputStream перед передачей записанной информации обратно клиенту с помощью ResponseEntity.

4.1. Сторона сервера

@Controller
public class StreamingResponseBodyController {

@GetMapping("/srb")
public ResponseEntity<StreamingResponseBody> handleRbe() {
StreamingResponseBody stream = out -> {
String msg = "/srb" + " @ " + new Date();
out.write(msg.getBytes());
};
return new ResponseEntity(stream, HttpStatus.OK);
}
}

4.2. Сторона клиента

Как и раньше, мы будем использовать обычный метод XHR для доступа к контроллеру выше:

var xhr = function(url) {
return new Promise(function(resolve, reject) {
var xmhr = new XMLHttpRequest();
//...
xmhr.open("GET", url, true);
xmhr.send();
//...
});
};

xhr('http://localhost:8080/javamvcasync/srb')
.then(function(success){ //... });

Далее, давайте взглянем на некоторые примеры успешного использования этих примеров.

5. Объединяем все вместе

После того, как мы успешно скомпилировали наш сервер и запустили наш клиент выше (получив доступ к предоставленному index.jsp ), мы должны увидеть следующее в нашем браузере:

./00236faf58ed1abbdcf83edd3484dbeb.png

И следующее в нашем терминале:

./5a7789cc340edf1edeec58da122e521b.png

Мы также можем вызывать конечные точки напрямую и видеть, как их потоковые ответы появляются в нашем браузере.

6. Заключение

Хотя Future и CompleteableFuture зарекомендовали себя надежными дополнениями к Java и Spring, теперь в нашем распоряжении есть несколько ресурсов для более адекватной обработки асинхронных и потоковых данных для высококонкурентных веб-приложений.

Наконец, ознакомьтесь с полными примерами кода на GitHub .