Перейти к основному содержимому

Ratpack с RxJava

· 4 мин. чтения

1. Введение

RxJava — одна из самых популярных библиотек реактивного программирования.

А Ratpack — это набор библиотек Java для создания компактных и мощных веб-приложений, построенных на Netty.

В этом руководстве мы обсудим включение RxJava в приложение Ratpack для создания красивого реактивного веб-приложения.

2. Зависимости Maven

Теперь нам сначала нужны зависимости ratpack-core и ratpack-rx :

<dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-rx</artifactId>
<version>1.6.0</version>
</dependency>

Обратите внимание, кстати, что ratpack-rx импортирует для нас зависимость rxjava .

3. Первоначальная настройка

RxJava поддерживает интеграцию сторонних библиотек, используя свою систему плагинов. Таким образом, мы можем включить различные стратегии выполнения в модель выполнения RxJava.

Ratpack подключается к этой модели выполнения через RxRatpack , который мы инициализируем при запуске:

RxRatpack.initialise();

Теперь важно отметить, что метод нужно вызывать только один раз за каждый запуск JVM .

В результате мы сможем сопоставлять Observable RxJava с типами Promise RxRatpack и наоборот.

4. От Observable к Promise s

Мы можем преобразовать Observable в RxJava в Ratpack Promise.

Однако есть небольшое несоответствие. Видите ли, Promise выдает одно значение, а Observable может выдавать их поток.

RxRatpack справляется с этим, предлагая два разных метода: promiseSingle() и promise().

Итак, давайте предположим, что у нас есть служба с именем MovieService , которая выдает одно обещание в getMovie(). Мы бы использовали promiseSingle() , так как знаем, что он выдаст только один раз:

Handler movieHandler = (ctx) -> {
MovieService movieSvc = ctx.get(MovieService.class);
Observable<Movie> movieObs = movieSvc.getMovie();
RxRatpack.promiseSingle(movieObs)
.then(movie -> ctx.render(Jackson.json(movie)));
};

С другой стороны, если getMovies() может возвращать поток результатов фильмов, мы бы использовали promise() :

Handler moviesHandler = (ctx) -> {
MovieService movieSvc = ctx.get(MovieService.class);
Observable<Movie> movieObs = movieSvc.getMovies();
RxRatpack.promise(movieObs)
.then(movie -> ctx.render(Jackson.json(movie)));
};

Затем мы можем добавить эти обработчики на наш сервер Ratpack, как обычно:

RatpackServer.start(def -> def.registryOf(rSpec -> rSpec.add(MovieService.class, new MovieServiceImpl()))
.handlers(chain -> chain
.get("movie", movieHandler)
.get("movies", moviesHandler)));

5. Promises для Observable s

И наоборот, мы можем сопоставить тип Promise в Ratpack обратно с RxJava Observable . ** **

RxRatpack снова имеет два метода: Observ( ) иObservEach ().

В этом случае мы представим, что у нас есть служба фильмов, которая возвращает Promise вместо Observable .

С нашим getMovie() мы бы использовалиObserv () :

Handler moviePromiseHandler = ctx -> {
MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
Promise<Movie> moviePromise = promiseSvc.getMovie();
RxRatpack.observe(moviePromise)
.subscribe(movie -> ctx.render(Jackson.json(movie)));
};

И когда мы получим список, как с getMovies() , мы будем использоватьObservEach () :

Handler moviesPromiseHandler = ctx -> {
MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
Promise<List<Movie>> moviePromises = promiseSvc.getMovies();
RxRatpack.observeEach(moviePromises)
.toList()
.subscribe(movie -> ctx.render(Jackson.json(movie)));
};

Затем, опять же, мы можем добавить обработчики, как и ожидалось:

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
.add(MoviePromiseService.class, new MoviePromiseServiceImpl()))
.handlers(chain -> chain
.get("movie", moviePromiseHandler)
.get("movies", moviesPromiseHandler)));

6. Параллельная обработка

RxRatpack поддерживает параллелизм с помощью методов fork() и forkEach() .

И это следует за образцом, который мы уже видели с каждым.

fork() берет один Observable и распараллеливает его выполнение в другом вычислительном потоке . Затем он автоматически привязывает данные обратно к исходному выполнению.

С другой стороны, forkEach() делает то же самое для каждого элемента, испускаемого потоком значений Observable .

Давайте на мгновение представим, что мы хотим извлечь выгоду из наших названий фильмов, и что это дорогостоящая операция.

Проще говоря, мы можем использовать forkEach() , чтобы перенести выполнение каждого из них на пул потоков:

Observable<Movie> movieObs = movieSvc.getMovies();
Observable<String> upperCasedNames = movieObs.compose(RxRatpack::forkEach)
.map(movie -> movie.getName().toUpperCase())
.serialize();

7. Неявная обработка ошибок

Наконец, неявная обработка ошибок — одна из ключевых особенностей интеграции RxJava.

По умолчанию наблюдаемые последовательности RxJava перенаправят любое исключение обработчику исключений контекста выполнения. По этой причине обработчики ошибок не нужно определять в наблюдаемых последовательностях.

Итак, мы можем настроить Ratpack для обработки этих ошибок, вызванных RxJava.

Допустим, например, что мы хотели, чтобы каждая ошибка печаталась в ответе HTTP.

Обратите внимание, что исключение, которое мы выбрасываем через Observable , перехватывается и обрабатывается нашим ServerErrorHandler :

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
.add(ServerErrorHandler.class, (ctx, throwable) -> {
ctx.render("Error caught by handler : " + throwable.getMessage());
}))
.handlers(chain -> chain
.get("error", ctx -> {
Observable.<String> error(new Exception("Error from observable")).subscribe(s -> {});
})));

Обратите внимание, что любая обработка ошибок на уровне подписчика имеет приоритет. Если бы наш Observable хотел выполнить собственную обработку ошибок, он мог бы это сделать, но поскольку этого не происходит, исключение просачивается до Ratpack.

8. Заключение

В этой статье мы рассказали о том, как настроить RxJava с помощью Ratpack.

Мы исследовали преобразование Observables в RxJava в типы Promise в Ratpack и наоборот. Мы также изучили функции параллелизма и неявной обработки ошибок, поддерживаемые интеграцией.

Все образцы кода, использованные в статье, можно найти на Github .