Перейти к основному содержимому

Реактивные системы в Java

· 20 мин. чтения

1. Введение

В этом руководстве мы разберемся с основами создания реактивных систем на Java с использованием Spring и других инструментов и фреймворков.

В процессе мы обсудим, как реактивное программирование — это всего лишь движущая сила для создания реактивной системы. Это поможет нам понять обоснование создания реактивных систем и различных спецификаций, библиотек и стандартов, которые они вдохновили на этом пути.

2. Что такое реактивные системы?

За последние несколько десятилетий в технологическом ландшафте произошло несколько потрясений, которые привели к полной трансформации того, как мы видим ценность технологий. Компьютерный мир до появления Интернета и представить себе не мог, какими способами и средствами он изменит нашу сегодняшнюю жизнь.

С доступом к Интернету для масс и постоянно развивающимся опытом, который он обещает, архитекторы приложений должны быть начеку, чтобы удовлетворить их спрос.

По сути, это означает, что мы никогда не сможем разработать приложение так, как раньше. Высокоотзывчивое приложение больше не роскошь, а необходимость .

Это также связано со случайными сбоями и непредсказуемой нагрузкой. Потребность часа состоит не только в том, чтобы получить правильный результат, но и в том, чтобы получить его быстро! Очень важно обеспечить потрясающий пользовательский опыт, который мы обещаем предоставить.

Именно это создает потребность в архитектурном стиле, который может дать нам реактивные системы.

2.1. Реактивный манифест

Еще в 2013 году команда разработчиков под руководством Йонаса Бонера собралась вместе, чтобы определить набор основных принципов в документе, известном как Реактивный манифест . Именно это заложило основу архитектурного стиля для создания реактивных систем. С тех пор этот манифест вызвал большой интерес со стороны сообщества разработчиков.

По сути, этот документ описывает рецепт гибкости, слабосвязанности и масштабируемости реактивной системы . Это делает такие системы простыми в разработке, устойчивыми к сбоям и, что наиболее важно, очень быстро реагирующими, что является основой для невероятного пользовательского опыта.

Так что же это за секретный рецепт? Ну, это вряд ли какой-то секрет! Манифест определяет основные характеристики или принципы реактивной системы:

  • Отзывчивый : реактивная система должна обеспечивать быстрое и постоянное время отклика и, следовательно, постоянное качество обслуживания.
  • Отказоустойчивость : реактивная система должна оставаться отзывчивой в случае случайных сбоев посредством репликации и изоляции.
  • Эластичность : такая система должна оставаться отзывчивой при непредсказуемых рабочих нагрузках благодаря рентабельной масштабируемости.
  • Управляемый сообщениями : он должен полагаться на асинхронную передачу сообщений между компонентами системы.

Эти принципы кажутся простыми и разумными, но их не всегда легко реализовать в сложной корпоративной архитектуре. В этом руководстве мы разработаем пример системы на Java с учетом этих принципов!

3. Что такое реактивное программирование?

Прежде чем мы продолжим, важно понять разницу между реактивным программированием и реактивными системами. Мы используем оба эти термина довольно часто и легко неправильно понимаем одно за другое. Как мы видели ранее, реактивные системы являются результатом определенного архитектурного стиля.

Напротив, реактивное программирование — это парадигма программирования, в которой основное внимание уделяется разработке асинхронных и неблокирующих компонентов . Ядром реактивного программирования является поток данных, который мы можем наблюдать и реагировать на него, а также оказывать обратное давление. Это приводит к неблокирующему выполнению и, следовательно, к лучшей масштабируемости с меньшим количеством потоков выполнения.

Это не означает, что реактивные системы и реактивное программирование исключают друг друга. На самом деле реактивное программирование — это важный шаг к реализации реактивной системы, но это еще не все!

3.1. Реактивные потоки

Reactive Streams — это инициатива сообщества, которая началась еще в 2013 году, чтобы предоставить стандарт для асинхронной обработки потоков с неблокирующим противодавлением . Цель здесь состояла в том, чтобы определить набор интерфейсов, методов и протоколов, которые могут описывать необходимые операции и сущности.

С тех пор появилось несколько реализаций на нескольких языках программирования, которые соответствуют спецификации реактивных потоков. К ним относятся Akka Streams, Ratpack и Vert.x и многие другие.

3.2. Реактивные библиотеки для Java

Одной из первоначальных целей реактивных потоков было включение в качестве официальной стандартной библиотеки Java. В результате спецификация реактивных потоков семантически эквивалентна библиотеке Java Flow, представленной в Java 9.

Кроме того, есть несколько популярных вариантов реализации реактивного программирования на Java:

  • Реактивные расширения : широко известные как ReactiveX, они предоставляют API для асинхронного программирования с наблюдаемыми потоками. Они доступны для нескольких языков программирования и платформ, включая Java, где она известна как RxJava.
  • Project Reactor : это еще одна реактивная библиотека, основанная на спецификации реактивных потоков и предназначенная для создания не-приложений на JVM. Он также является основой реактивного стека в экосистеме Spring.

4. Простое приложение

Для целей этого руководства мы разработаем простое приложение на основе архитектуры микросервисов с минимальным внешним интерфейсом. В архитектуре приложения должно быть достаточно элементов для создания реактивной системы.

Для нашего приложения мы примем сквозное реактивное программирование и другие шаблоны и инструменты для достижения основных характеристик реактивной системы.

4.1. Архитектура

Мы начнем с определения простой архитектуры приложения, которая не обязательно демонстрирует характеристики реактивных систем . С этого момента мы будем вносить необходимые изменения для достижения этих характеристик одну за другой.

Итак, во-первых, давайте начнем с определения простой архитектуры:

./c035ef281f23af034f67e5c99339dc34.jpg

Это довольно простая архитектура, в которой есть множество микросервисов для облегчения коммерческого использования, где мы можем разместить заказ. У него также есть интерфейс для взаимодействия с пользователем, и все общение происходит как REST через HTTP. Более того, каждый микросервис управляет своими данными в отдельных базах данных, что называется «база данных для каждого сервиса».

Мы продолжим и создадим это простое приложение в следующих подразделах. Это будет нашей основой для понимания ошибочности этой архитектуры, а также способов и средств принятия принципов и практик, чтобы мы могли преобразовать ее в реактивную систему.

4.3. Микросервис инвентаризации

Микросервис инвентаризации будет отвечать за управление списком продуктов и их текущим запасом . Это также позволит изменять запас по мере обработки заказов. Мы будем использовать Spring Boot с MongoDB для разработки этой службы.

Начнем с определения контроллера для предоставления некоторых конечных точек:

@GetMapping
public List<Product> getAllProducts() {
return productService.getProducts();
}

@PostMapping
public Order processOrder(@RequestBody Order order) {
return productService.handleOrder(order);
}

@DeleteMapping
public Order revertOrder(@RequestBody Order order) {
return productService.revertOrder(order);
}

и сервис для инкапсуляции нашей бизнес-логики:

@Transactional
public Order handleOrder(Order order) {
order.getLineItems()
.forEach(l -> {
Product> p = productRepository.findById(l.getProductId())
.orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
if (p.getStock() >= l.getQuantity()) {
p.setStock(p.getStock() - l.getQuantity());
productRepository.save(p);
} else {
throw new RuntimeException("Product is out of stock: " + l.getProductId());
}
});
return order.setOrderStatus(OrderStatus.SUCCESS);
}

@Transactional
public Order revertOrder(Order order) {
order.getLineItems()
.forEach(l -> {
Product p = productRepository.findById(l.getProductId())
.orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
p.setStock(p.getStock() + l.getQuantity());
productRepository.save(p);
});
return order.setOrderStatus(OrderStatus.SUCCESS);
}

Обратите внимание, что мы сохраняем сущности внутри транзакции , что гарантирует отсутствие противоречивых состояний в случае исключений.

Помимо этого, нам также нужно будет определить объекты домена, интерфейс репозитория и кучу классов конфигурации, необходимых для правильной работы.

Но поскольку они в основном шаблонные, мы не будем их рассматривать, и на них можно сослаться в репозитории GitHub, представленном в последнем разделе этой статьи.

4.4. Микросервис доставки

Микросервис доставки также не будет сильно отличаться. Он будет отвечать за проверку возможности создания отгрузки для заказа и создание ее, если это возможно.

Как и раньше, мы определим контроллер для предоставления наших конечных точек, фактически только одну конечную точку:

@PostMapping
public Order process(@RequestBody Order order) {
return shippingService.handleOrder(order);
}

и сервис для инкапсуляции бизнес-логики, связанной с доставкой заказа:

public Order handleOrder(Order order) {
LocalDate shippingDate = null;
if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
&& LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
shippingDate = LocalDate.now().plusDays(1);
} else {
throw new RuntimeException("The current time is off the limits to place order.");
}
shipmentRepository.save(new Shipment()
.setAddress(order.getShippingAddress())
.setShippingDate(shippingDate));
return order.setShippingDate(shippingDate)
.setOrderStatus(OrderStatus.SUCCESS);
}

Наша простая служба доставки просто проверяет действительное временное окно для размещения заказов. Мы не будем обсуждать остальную часть шаблонного кода, как и раньше.

4.5. Заказать микросервис

Наконец, мы определим микросервис заказов, который помимо всего прочего будет отвечать за создание нового заказа . Интересно, что он также будет выступать в качестве службы оркестрации, где он будет связываться со службой инвентаризации и службой доставки заказа.

Давайте определим наш контроллер с необходимыми конечными точками:

@PostMapping
public Order create(@RequestBody Order order) {
Order processedOrder = orderService.createOrder(order);
if (OrderStatus.FAILURE.equals(processedOrder.getOrderStatus())) {
throw new RuntimeException("Order processing failed, please try again later.");
}
return processedOrder;
}
@GetMapping
public List<Order> getAll() {
return orderService.getOrders();
}

И сервис для инкапсуляции бизнес-логики, связанной с заказами:

public Order createOrder(Order order) {
boolean success = true;
Order savedOrder = orderRepository.save(order);
Order inventoryResponse = null;
try {
inventoryResponse = restTemplate.postForObject(
inventoryServiceUrl, order, Order.class);
} catch (Exception ex) {
success = false;
}
Order shippingResponse = null;
try {
shippingResponse = restTemplate.postForObject(
shippingServiceUrl, order, Order.class);
} catch (Exception ex) {
success = false;
HttpEntity<Order> deleteRequest = new HttpEntity<>(order);
ResponseEntity<Order> deleteResponse = restTemplate.exchange(
inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class);
}
if (success) {
savedOrder.setOrderStatus(OrderStatus.SUCCESS);
savedOrder.setShippingDate(shippingResponse.getShippingDate());
} else {
savedOrder.setOrderStatus(OrderStatus.FAILURE);
}
return orderRepository.save(savedOrder);
}

public List<Order> getOrders() {
return orderRepository.findAll();
}

Обработка заказов, когда мы организуем звонки в службы запасов и доставки, далека от идеала. Распределенные транзакции с несколькими микросервисами — сложная тема сама по себе и выходит за рамки этого руководства .

Однако позже в этом руководстве мы увидим, как реактивная система может в определенной степени избежать необходимости в распределенных транзакциях.

Как и прежде, мы не будем рассматривать остальную часть стандартного кода. Однако на это можно ссылаться в репозитории GitHub.

4.6. Внешний интерфейс

Давайте также добавим пользовательский интерфейс, чтобы завершить обсуждение. Пользовательский интерфейс будет основан на Angular и будет представлять собой простое одностраничное приложение.

Нам нужно создать простой компонент в Angular для управления созданием и получением заказов . Особое значение имеет та часть, где мы вызываем наш API для создания заказа:

createOrder() {
let headers = new HttpHeaders({'Content-Type': 'application/json'});
let options = {headers: headers}
this.http.post('http://localhost:8080/api/orders', this.form.value, options)
.subscribe(
(response) => {
this.response = response
},
(error) => {
this.error = error
}
)
}

Приведенный выше фрагмент кода предполагает, что данные заказа будут записаны в форму и доступны в рамках компонента . Angular предлагает фантастическую поддержку для создания простых и сложных форм с использованием реактивных форм и форм, управляемых шаблонами .

Также важна та часть, где мы получаем ранее созданные заказы:

getOrders() {
this.previousOrders = this.http.get(''http://localhost:8080/api/orders'')
}

Обратите внимание, что модуль Angular HTTP является асинхронным по своей природе и, следовательно, возвращает RxJS Observable s . Мы можем обработать ответ в нашем представлении, передав его через асинхронный канал:

<div class="container" *ngIf="previousOrders !== null">
<h2>Your orders placed so far:</h2>
<ul>
<li *ngFor="let order of previousOrders | async">
<p>Order ID: {{ order.id }}, Order Status: {{order.orderStatus}}, Order Message: {{order.responseMessage}}</p>
</li>
</ul>
</div>

Конечно, для работы Angular потребуются шаблоны, стили и конфигурации, но на них можно ссылаться в репозитории GitHub. Обратите внимание, что здесь мы объединили все в один компонент, что в идеале не следует делать.

Но для этого урока эти проблемы не входят в объем.

4.7. Развертывание приложения

Теперь, когда мы создали все отдельные части приложения, как нам их развернуть? Ну, мы всегда можем сделать это вручную. Но мы должны быть осторожны, так как вскоре это может стать утомительным.

В этом руководстве мы будем использовать Docker Compose для создания и развертывания нашего приложения на Docker Machine . Для этого нам потребуется добавить стандартный файл Dockerfile в каждую службу и создать файл Docker Compose для всего приложения.

Давайте посмотрим, как выглядит этот файл docker-compose.yml :

version: '3'
services:
frontend:
build: ./frontend
ports:
- "80:80"
order-service:
build: ./order-service
ports:
- "8080:8080"
inventory-service:
build: ./inventory-service
ports:
- "8081:8081"
shipping-service:
build: ./shipping-service
ports:
- "8082:8082"

Это довольно стандартное определение сервисов в Docker Compose и не требует особого внимания.

4.8. Проблемы с этой архитектурой

Теперь, когда у нас есть простое приложение с несколькими службами, взаимодействующими друг с другом, мы можем обсудить проблемы этой архитектуры. Есть то, что мы попытаемся рассмотреть в следующих разделах и в конечном итоге дойдем до состояния, в котором мы преобразовали бы наше приложение в реактивную систему!

Хотя это приложение далеко от программного обеспечения производственного уровня и есть несколько проблем, мы сосредоточимся на проблемах, связанных с мотивацией реактивных систем :

  • Сбой либо в службе инвентаризации, либо в службе доставки может иметь каскадный эффект.
  • Все обращения к внешним системам и базе данных блокируются по своей природе.
  • Развертывание не может автоматически обрабатывать сбои и колебания нагрузки.

5. Реактивное программирование

Блокирование вызовов в любой программе часто приводит к тому, что важные ресурсы просто ждут, пока что-то произойдет . К ним относятся вызовы базы данных, вызовы веб-служб и вызовы файловой системы. Если мы сможем освободить потоки выполнения от этого ожидания и предоставим механизм возврата после получения результатов, это приведет к гораздо лучшему использованию ресурсов.

Вот что дает нам принятие парадигмы реактивного программирования. Хотя для многих из этих вызовов можно переключиться на реактивную библиотеку, это возможно не для всех. Для нас, к счастью, Spring значительно упрощает использование реактивного программирования с API MongoDB и REST:

./62dcb8c30c57ba26fd4c861b259c31b0.jpg

Spring Data Mongo поддерживает реактивный доступ через Java-драйвер MongoDB Reactive Streams. Он предоставляет ReactiveMongoTemplate и ReactiveMongoRepository , оба из которых имеют обширную функциональность сопоставления.

Spring WebFlux предоставляет веб-инфраструктуру реактивного стека для Spring, обеспечивая неблокирующий код и противодействие Reactive Streams. Он использует Reactor в качестве своей реактивной библиотеки. Кроме того, он предоставляет WebClient для выполнения HTTP-запросов с обратным давлением Reactive Streams. Он использует Reactor Netty в качестве клиентской библиотеки HTTP.

5.1. Служба инвентаризации

Мы начнем с изменения наших конечных точек для создания реактивных издателей:

@GetMapping
public Flux<Product> getAllProducts() {
return productService.getProducts();
}
@PostMapping
public Mono<Order> processOrder(@RequestBody Order order) {
return productService.handleOrder(order);
}

@DeleteMapping
public Mono<Order> revertOrder(@RequestBody Order order) {
return productService.revertOrder(order);
}

Очевидно, нам придется внести необходимые изменения и в сервис:

@Transactional
public Mono<Order> handleOrder(Order order) {
return Flux.fromIterable(order.getLineItems())
.flatMap(l -> productRepository.findById(l.getProductId()))
.flatMap(p -> {
int q = order.getLineItems().stream()
.filter(l -> l.getProductId().equals(p.getId()))
.findAny().get()
.getQuantity();
if (p.getStock() >= q) {
p.setStock(p.getStock() - q);
return productRepository.save(p);
} else {
return Mono.error(new RuntimeException("Product is out of stock: " + p.getId()));
}
})
.then(Mono.just(order.setOrderStatus("SUCCESS")));
}

@Transactional
public Mono<Order> revertOrder(Order order) {
return Flux.fromIterable(order.getLineItems())
.flatMap(l -> productRepository.findById(l.getProductId()))
.flatMap(p -> {
int q = order.getLineItems().stream()
.filter(l -> l.getProductId().equals(p.getId()))
.findAny().get()
.getQuantity();
p.setStock(p.getStock() + q);
return productRepository.save(p);
})
.then(Mono.just(order.setOrderStatus("SUCCESS")));
}

5.2. Службы доставки

Точно так же мы изменим конечную точку нашей службы доставки:

@PostMapping
public Mono<Order> process(@RequestBody Order order) {
return shippingService.handleOrder(order);
}

И соответствующие изменения в службе для использования реактивного программирования:

public Mono<Order> handleOrder(Order order) {
return Mono.just(order)
.flatMap(o -> {
LocalDate shippingDate = null;
if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
&& LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
shippingDate = LocalDate.now().plusDays(1);
} else {
return Mono.error(new RuntimeException("The current time is off the limits to place order."));
}
return shipmentRepository.save(new Shipment()
.setAddress(order.getShippingAddress())
.setShippingDate(shippingDate));
})
.map(s -> order.setShippingDate(s.getShippingDate())
.setOrderStatus(OrderStatus.SUCCESS));
}

5.3. Заказать услугу

Нам придется внести аналогичные изменения в конечные точки службы заказов:

@PostMapping
public Mono<Order> create(@RequestBody Order order) {
return orderService.createOrder(order)
.flatMap(o -> {
if (OrderStatus.FAILURE.equals(o.getOrderStatus())) {
return Mono.error(new RuntimeException("Order processing failed, please try again later. " + o.getResponseMessage()));
} else {
return Mono.just(o);
}
});
}

@GetMapping
public Flux<Order> getAll() {
return orderService.getOrders();
}

Изменения в сервисе будут более сложными, поскольку нам придется использовать Spring WebClient для вызова инвентаризации и реактивных конечных точек доставки:

public Mono<Order> createOrder(Order order) {
return Mono.just(order)
.flatMap(orderRepository::save)
.flatMap(o -> {
return webClient.method(HttpMethod.POST)
.uri(inventoryServiceUrl)
.body(BodyInserters.fromValue(o))
.exchange();
})
.onErrorResume(err -> {
return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
.setResponseMessage(err.getMessage()));
})
.flatMap(o -> {
if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
return webClient.method(HttpMethod.POST)
.uri(shippingServiceUrl)
.body(BodyInserters.fromValue(o))
.exchange();
} else {
return Mono.just(o);
}
})
.onErrorResume(err -> {
return webClient.method(HttpMethod.POST)
.uri(inventoryServiceUrl)
.body(BodyInserters.fromValue(order))
.retrieve()
.bodyToMono(Order.class)
.map(o -> o.setOrderStatus(OrderStatus.FAILURE)
.setResponseMessage(err.getMessage()));
})
.map(o -> {
if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
return order.setShippingDate(o.getShippingDate())
.setOrderStatus(OrderStatus.SUCCESS);
} else {
return order.setOrderStatus(OrderStatus.FAILURE)
.setResponseMessage(o.getResponseMessage());
}
})
.flatMap(orderRepository::save);
}

public Flux<Order> getOrders() {
return orderRepository.findAll();
}

Такой вид оркестровки с реактивными API-интерфейсами — непростая задача, часто подверженная ошибкам, а также трудная для отладки . Мы увидим, как это можно упростить в следующем разделе.

5.4. Внешний интерфейс

Теперь, когда наши API-интерфейсы способны передавать события по мере их возникновения, вполне естественно, что мы должны иметь возможность использовать это и в нашем внешнем интерфейсе. К счастью, Angular поддерживает EventSource , интерфейс для Server-Sent Events .

Давайте посмотрим, как мы можем получить и обработать все наши предыдущие заказы в виде потока событий:

getOrderStream() {
return Observable.create((observer) => {
let eventSource = new EventSource('http://localhost:8080/api/orders')
eventSource.onmessage = (event) => {
let json = JSON.parse(event.data)
this.orders.push(json)
this._zone.run(() => {
observer.next(this.orders)
})
}
eventSource.onerror = (error) => {
if(eventSource.readyState === 0) {
eventSource.close()
this._zone.run(() => {
observer.complete()
})
} else {
this._zone.run(() => {
observer.error('EventSource error: ' + error)
})
}
}
})
}

6. Архитектура, управляемая сообщениями

Первая проблема, которую мы собираемся решить, связана с обменом данными между службами. Сейчас эти коммуникации синхронны, что создает несколько проблем . К ним относятся каскадные сбои, сложная оркестровка и распределенные транзакции, и это лишь некоторые из них.

Очевидный способ решить эту проблему — сделать эти коммуникации асинхронными. Брокер сообщений для облегчения связи между службами может помочь нам. Мы будем использовать Kafka в качестве нашего брокера сообщений и Spring для Kafka для создания и потребления сообщений:

./5d1319cf08df7b39ef4d57d86b8eef2e.jpg

Мы будем использовать единую тему для создания и использования сообщений о заказах с разными статусами заказов, на которые будут реагировать службы.

Давайте посмотрим, как нужно изменить каждую услугу.

6.1. Служба инвентаризации

Давайте начнем с определения производителя сообщений для нашей службы инвентаризации:

@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendMessage(Order order) {
this.kafkaTemplate.send("orders", order);
}

Далее нам нужно определить получателя сообщений для службы инвентаризации, чтобы он реагировал на различные сообщения по теме:

@KafkaListener(topics = "orders", groupId = "inventory")
public void consume(Order order) throws IOException {
if (OrderStatus.RESERVE_INVENTORY.equals(order.getOrderStatus())) {
productService.handleOrder(order)
.doOnSuccess(o -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_SUCCESS));
})
.doOnError(e -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_FAILURE)
.setResponseMessage(e.getMessage()));
}).subscribe();
} else if (OrderStatus.REVERT_INVENTORY.equals(order.getOrderStatus())) {
productService.revertOrder(order)
.doOnSuccess(o -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_SUCCESS));
})
.doOnError(e -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_FAILURE)
.setResponseMessage(e.getMessage()));
}).subscribe();
}
}

Это также означает, что теперь мы можем безопасно удалить некоторые избыточные конечные точки из нашего контроллера. Этих изменений достаточно для достижения асинхронной связи в нашем приложении.

6.2. Службы доставки

Изменения в службе доставки относительно аналогичны тому, что мы сделали ранее со службой инвентаризации. Производитель сообщения тот же, а потребитель сообщения специфичен для логики доставки:

@KafkaListener(topics = "orders", groupId = "shipping")
public void consume(Order order) throws IOException {
if (OrderStatus.PREPARE_SHIPPING.equals(order.getOrderStatus())) {
shippingService.handleOrder(order)
.doOnSuccess(o -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_SUCCESS)
.setShippingDate(o.getShippingDate()));
})
.doOnError(e -> {
orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_FAILURE)
.setResponseMessage(e.getMessage()));
}).subscribe();
}
}

Теперь мы можем безопасно удалить все конечные точки в нашем контроллере, поскольку они нам больше не нужны.

6.3. Заказать услугу

Изменения в службе заказов будут немного более сложными, так как именно здесь мы выполняли всю оркестровку ранее.

Тем не менее, производитель сообщений остается неизменным, а потребитель сообщений берет на себя логику службы заказа:

@KafkaListener(topics = "orders", groupId = "orders")
public void consume(Order order) throws IOException {
if (OrderStatus.INITIATION_SUCCESS.equals(order.getOrderStatus())) {
orderRepository.findById(order.getId())
.map(o -> {
orderProducer.sendMessage(o.setOrderStatus(OrderStatus.RESERVE_INVENTORY));
return o.setOrderStatus(order.getOrderStatus())
.setResponseMessage(order.getResponseMessage());
})
.flatMap(orderRepository::save)
.subscribe();
} else if ("INVENTORY-SUCCESS".equals(order.getOrderStatus())) {
orderRepository.findById(order.getId())
.map(o -> {
orderProducer.sendMessage(o.setOrderStatus(OrderStatus.PREPARE_SHIPPING));
return o.setOrderStatus(order.getOrderStatus())
.setResponseMessage(order.getResponseMessage());
})
.flatMap(orderRepository::save)
.subscribe();
} else if ("SHIPPING-FAILURE".equals(order.getOrderStatus())) {
orderRepository.findById(order.getId())
.map(o -> {
orderProducer.sendMessage(o.setOrderStatus(OrderStatus.REVERT_INVENTORY));
return o.setOrderStatus(order.getOrderStatus())
.setResponseMessage(order.getResponseMessage());
})
.flatMap(orderRepository::save)
.subscribe();
} else {
orderRepository.findById(order.getId())
.map(o -> {
return o.setOrderStatus(order.getOrderStatus())
.setResponseMessage(order.getResponseMessage());
})
.flatMap(orderRepository::save)
.subscribe();
}
}

Потребитель здесь просто реагирует на сообщения о заказе с разными статусами заказа . Это то, что дает нам хореографию между различными сервисами.

Наконец, наша служба заказов также должна будет измениться, чтобы поддерживать эту хореографию:

public Mono<Order> createOrder(Order order) {
return Mono.just(order)
.flatMap(orderRepository::save)
.map(o -> {
orderProducer.sendMessage(o.setOrderStatus(OrderStatus.INITIATION_SUCCESS));
return o;
})
.onErrorResume(err -> {
return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
.setResponseMessage(err.getMessage()));
})
.flatMap(orderRepository::save);
}

Обратите внимание, что это намного проще, чем сервис, который мы должны были написать с реактивными конечными точками в предыдущем разделе. Асинхронная хореография часто приводит к гораздо более простому коду, хотя и достигается за счет возможной согласованности и сложной отладки и мониторинга . Как можно догадаться, наш интерфейс больше не будет сразу получать окончательный статус заказа.

7. Служба оркестрации контейнеров

Последняя часть головоломки, которую мы хотим решить, связана с развертыванием.

Чего мы хотим от приложения, так это достаточной избыточности и тенденции к автоматическому масштабированию вверх или вниз в зависимости от необходимости.

Мы уже добились контейнеризации сервисов через Docker и управляем зависимостями между ними через Docker Compose. Хотя это фантастические инструменты сами по себе, они не помогают нам достичь того, чего мы хотим.

Следовательно, нам нужна служба оркестрации контейнеров, которая может позаботиться об избыточности и масштабируемости нашего приложения . Хотя существует несколько вариантов, одним из популярных является Kubernetes. Kubernetes предоставляет нам независимый от поставщика облачных услуг способ добиться высокомасштабируемого развертывания контейнерных рабочих нагрузок.

Kubernetes оборачивает контейнеры, такие как Docker, в поды, которые являются наименьшей единицей развертывания. Кроме того, мы можем использовать Deployment для декларативного описания желаемого состояния.

Развертывание создает наборы реплик, которые внутренне отвечают за запуск модулей. Мы можем описать минимальное количество одинаковых подов, которые должны работать в любой момент времени. Это обеспечивает избыточность и, следовательно, высокую доступность.

Давайте посмотрим, как мы можем определить развертывание Kubernetes для наших приложений:

apiVersion: apps/v1
kind: Deployment
metadata:
name: inventory-deployment
spec:
replicas: 3
selector:
matchLabels:
name: inventory-deployment
template:
metadata:
labels:
name: inventory-deployment
spec:
containers:
- name: inventory
image: inventory-service-async:latest
ports:
- containerPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: shipping-deployment
spec:
replicas: 3
selector:
matchLabels:
name: shipping-deployment
template:
metadata:
labels:
name: shipping-deployment
spec:
containers:
- name: shipping
image: shipping-service-async:latest
ports:
- containerPort: 8082
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-deployment
spec:
replicas: 3
selector:
matchLabels:
name: order-deployment
template:
metadata:
labels:
name: order-deployment
spec:
containers:
- name: order
image: order-service-async:latest
ports:
- containerPort: 8080

Здесь мы объявляем наше развертывание для поддержки трех идентичных реплик модулей в любое время. Хотя это хороший способ добавить избыточность, этого может быть недостаточно для меняющихся нагрузок. Kubernetes предоставляет еще один ресурс, известный как Horizontal Pod Autoscaler , который может масштабировать количество модулей в развертывании на основе наблюдаемых показателей , таких как загрузка ЦП.

Обратите внимание, что мы только что рассмотрели аспекты масштабируемости приложения, размещенного в кластере Kubernetes. Это не обязательно означает, что сам базовый кластер является масштабируемым. Создание кластера Kubernetes с высокой доступностью — нетривиальная задача, которая выходит за рамки данного руководства.

8. Результирующая реактивная система

Теперь, когда мы внесли несколько улучшений в нашу архитектуру, возможно, пришло время сравнить это с определением реактивной системы. Мы продолжим оценку по четырем характеристикам реактивных систем, которые мы обсуждали ранее в этом руководстве:

  • Отзывчивость : принятие парадигмы реактивного программирования должно помочь нам достичь сквозного неблокирующего и, следовательно, отзывчивого приложения.
  • Устойчивость : развертывание Kubernetes с ReplicaSet желаемого количества модулей должно обеспечивать устойчивость к случайным сбоям.
  • Эластичность : кластер и ресурсы Kubernetes должны предоставить нам необходимую поддержку, чтобы быть эластичными перед лицом непредсказуемых нагрузок.
  • Управляемый сообщениями : асинхронная обработка всего взаимодействия между службами через брокера Kafka должна помочь нам в этом.

Хотя это выглядит весьма многообещающе, это далеко не конец. Честно говоря, поиски действительно реактивной системы должны быть постоянными улучшениями . Мы никогда не сможем упредить все, что может дать сбой в очень сложной инфраструктуре, где наше приложение является лишь небольшой частью.

Таким образом, реактивная система потребует надежности от каждой части, из которой состоит целое . От физической сети до инфраструктурных сервисов, таких как DNS, все они должны соответствовать друг другу, чтобы помочь нам достичь конечной цели.

Часто мы не можем управлять всеми этими деталями и предоставлять необходимые гарантии. И здесь управляемая облачная инфраструктура помогает облегчить нашу боль . Мы можем выбирать из множества сервисов, таких как IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) и PaaS (Platform-as-a-Service), чтобы делегировать обязанности внешним сторонам. Это оставляет нас с ответственностью за наше приложение, насколько это возможно.

9. Заключение

В этом уроке мы рассмотрели основы реактивных систем и их сравнение с реактивным программированием. Мы создали простое приложение с несколькими микросервисами и выделили проблемы, которые собираемся решить с помощью реактивной системы.

Далее мы пошли дальше, представив реактивное программирование, архитектуру на основе сообщений и службу оркестрации контейнеров в архитектуре для реализации реактивной системы.

Наконец, мы обсудили получившуюся архитектуру и то, как она остается на пути к реактивной системе! Этот учебник не знакомит нас со всеми инструментами, фреймворками или шаблонами, которые могут помочь нам создать реактивную систему, но он знакомит нас с путешествием.

Как обычно, исходный код этой статьи можно найти на GitHub .