1. Обзор
В этой статье мы рассмотрим Axon
и то, как он помогает нам внедрять приложения с учетом CQRS
(разделение ответственности за выполнение команд и запросов) и источников событий
.
В этом руководстве будут использоваться как Axon Framework, так и Axon Server . Первый будет содержать нашу реализацию, а второй будет нашим специализированным решением для хранилища событий и маршрутизации сообщений.
Пример приложения, которое мы будем создавать, фокусируется на домене Order .
Для этого мы будем использовать стандартные блоки CQRS и Event Sourcing, которые Axon предоставляет нам .
Обратите внимание, что многие общие концепции исходят прямо из DDD ,
что выходит за рамки данной статьи.
2. Зависимости Maven
Мы создадим приложение Axon/Spring Boot. Следовательно, нам нужно добавить последнюю зависимость axon-spring-boot-starter
в наш pom.xml
, а также зависимость axon-test
для тестирования.
Чтобы использовать совпадающие версии, мы будем использовать axon-bom в нашем разделе управления зависимостями:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-bom</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3. Аксон-сервер
Мы будем использовать Axon Server в качестве нашего хранилища событий и нашего специального решения для маршрутизации команд, событий и запросов.
В качестве хранилища событий он дает нам идеальные характеристики, необходимые для хранения событий. В этой статье рассказывается, почему это желательно.
В качестве решения для маршрутизации сообщений оно дает нам возможность соединять несколько экземпляров вместе, не сосредотачиваясь на настройке таких вещей, как RabbitMQ или тема Kafka, для обмена и отправки сообщений.
Axon Server можно скачать здесь . Поскольку это простой файл JAR, для его запуска достаточно следующей операции:
java -jar axonserver.jar
Это запустит один экземпляр Axon Server, доступный через localhost:8024
. Конечная точка предоставляет обзор подключенных приложений и сообщений, которые они могут обрабатывать, а также механизм запросов к хранилищу событий, содержащемуся в Axon Server.
Конфигурация Axon Server по умолчанию вместе с зависимостью axon-spring-boot-starter
гарантирует, что наша служба Order будет автоматически подключаться к нему.
4. API службы заказа — команды
Мы настроим нашу службу заказов с учетом CQRS. Поэтому мы будем подчеркивать сообщения, которые проходят через наше приложение.
Во-первых, мы определим Команды, то есть выражения намерений. Сервис Order может обрабатывать три разных типа действий:
- Создание нового заказа
- Подтверждение заказа
- Доставка заказа
Естественно, будет три командных сообщения, с которыми может работать наш домен — CreateOrderCommand
, ConfirmOrderCommand
и ShipOrderCommand
:
public class CreateOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
private final String productId;
// constructor, getters, equals/hashCode and toString
}
public class ConfirmOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
// constructor, getters, equals/hashCode and toString
}
public class ShipOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
// constructor, getters, equals/hashCode and toString
}
Аннотация TargetAggregateIdentifier
сообщает Axon, что аннотированное поле является идентификатором данного агрегата, на который должна быть нацелена команда. `` Мы кратко коснемся агрегатов позже в этой статье.
Также обратите внимание, что мы пометили поля в командах как окончательные.
Это сделано намеренно, так как рекомендуется, чтобы любая
реализация сообщения была неизменной .
5. API службы заказа — События
Наш агрегат будет обрабатывать команды , так как он отвечает за принятие решения о том, может ли Заказ быть создан, подтвержден или отправлен.
Он уведомит остальную часть приложения о своем решении, опубликовав событие. У нас будет три типа событий — OrderCreatedEvent, OrderConfirmedEvent
и OrderShippedEvent
:
public class OrderCreatedEvent {
private final String orderId;
private final String productId;
// default constructor, getters, equals/hashCode and toString
}
public class OrderConfirmedEvent {
private final String orderId;
// default constructor, getters, equals/hashCode and toString
}
public class OrderShippedEvent {
private final String orderId;
// default constructor, getters, equals/hashCode and toString
}
6. Командная модель — совокупность заказов
Теперь, когда мы смоделировали наш основной API с учетом команд и событий, мы можем приступить к созданию модели команд.
Aggregate является обычным
компонентом модели команд и происходит от DDD. Другие платформы также используют эту концепцию, как, например, в этой статье о сохранении агрегатов DDD с помощью Spring.
Поскольку наша предметная область сосредоточена на работе с заказами, мы создадим OrderAggregate
в качестве центра нашей модели команд.
6.1. Совокупный класс
Итак, давайте создадим наш базовый агрегатный класс:
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
AggregateLifecycle.apply(new OrderCreatedEvent(command.getOrderId(), command.getProductId()));
}
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
orderConfirmed = false;
}
protected OrderAggregate() { }
}
Аннотация Aggregate
— это специальная аннотация Axon Spring, помечающая этот класс как агрегат. Он уведомит платформу о том, что для этого OrderAggregate
необходимо создать экземпляры необходимых строительных блоков CQRS и Event Sourcing .
Поскольку агрегат будет обрабатывать команды, предназначенные для конкретного экземпляра агрегата, нам необходимо указать идентификатор с аннотацией AggregateIdentifier .
Наш агрегат начнет свой жизненный цикл после обработки команды CreateOrderCommand
в « конструкторе обработки команд» OrderAggregate
. Чтобы сообщить платформе, что данная функция может обрабатывать команды, мы добавим аннотацию CommandHandler
.
При обработке CreateOrderCommand
остальная часть приложения будет уведомлена о том, что заказ был создан путем публикации события OrderCreatedEvent.
Чтобы опубликовать событие из агрегата, мы будем использовать AggregateLifecycle#apply(Object…)
.
С этого момента мы действительно можем начать использовать Event Sourcing в качестве движущей силы для воссоздания агрегированного экземпляра из его потока событий.
Мы начинаем это с «события создания агрегата», OrderCreatedEvent
, которое обрабатывается в аннотированной функции EventSourcingHandler для установки состояния orderId
и
orderConfirmed агрегата
Order.
Также обратите внимание, что для получения агрегата на основе его событий Axon требуется конструктор по умолчанию.
6.2. Совокупные обработчики команд
Теперь, когда у нас есть базовый агрегат, мы можем приступить к реализации оставшихся обработчиков команд:
@CommandHandler
public void handle(ConfirmOrderCommand command) {
if (orderConfirmed) {
return;
}
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(ShipOrderCommand command) {
if (!orderConfirmed) {
throw new UnconfirmedOrderException();
}
apply(new OrderShippedEvent(orderId));
}
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
orderConfirmed = true;
}
Сигнатура наших обработчиков команд и источников событий просто указывает handle({the-command})
и on({the-event})
для сохранения краткого формата.
Кроме того, мы определили, что Заказ может быть подтвержден только один раз и отправлен, если он был подтвержден. Таким образом, мы проигнорируем команду в первом случае и создадим исключение UnconfirmedOrderException
, если второе не так.
Это иллюстрирует необходимость того, чтобы обработчик источника OrderConfirmedEvent обновлял состояние
orderConfirmed
до true
для агрегата Order.
7. Тестирование модели команд
Во-первых, нам нужно настроить наш тест, создав FixtureConfiguration
для OrderAggregate
:
private FixtureConfiguration<OrderAggregate> fixture;
@Before
public void setUp() {
fixture = new AggregateTestFixture<>(OrderAggregate.class);
}
Первый тестовый пример должен охватывать простейшую ситуацию. Когда агрегат обрабатывает CreateOrderCommand
, он должен создать OrderCreatedEvent
:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.givenNoPriorActivity()
.when(new CreateOrderCommand(orderId, productId))
.expectEvents(new OrderCreatedEvent(orderId, productId));
Затем мы можем протестировать логику принятия решения о возможности отправки Заказа только в том случае, если он был подтвержден. Из-за этого у нас есть два сценария — в одном мы ожидаем исключение, а в другом ожидаем OrderShippedEvent
.
Давайте посмотрим на первый сценарий, где мы ожидаем исключение:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId))
.when(new ShipOrderCommand(orderId))
.expectException(UnconfirmedOrderException.class);
А теперь второй сценарий, где мы ожидаем OrderShippedEvent
:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId), new OrderConfirmedEvent(orderId))
.when(new ShipOrderCommand(orderId))
.expectEvents(new OrderShippedEvent(orderId));
8. Модель запроса — обработчики событий
На данный момент мы установили наш основной API с командами и событиями, и у нас есть командная модель нашей службы заказов CQRS, OrderAggregate
.
Затем мы можем начать думать об одной из моделей запросов, которые должно обслуживать наше приложение .
Одной из таких моделей является Орден
:
public class Order {
private final String orderId;
private final String productId;
private OrderStatus orderStatus;
public Order(String orderId, String productId) {
this.orderId = orderId;
this.productId = productId;
orderStatus = OrderStatus.CREATED;
}
public void setOrderConfirmed() {
this.orderStatus = OrderStatus.CONFIRMED;
}
public void setOrderShipped() {
this.orderStatus = OrderStatus.SHIPPED;
}
// getters, equals/hashCode and toString functions
}
public enum OrderStatus {
CREATED, CONFIRMED, SHIPPED
}
Мы будем обновлять эту модель на основе событий, распространяющихся через нашу систему. Компонент Spring Service
для обновления нашей модели сделает свое дело:
@Service
public class OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId, event.getProductId()));
}
// Event Handlers for OrderConfirmedEvent and OrderShippedEvent...
}
Поскольку мы использовали зависимость axon-spring-boot-starter
для запуска нашего приложения Axon, платформа автоматически просканирует все bean-компоненты на наличие существующих функций обработки сообщений.
Поскольку OrdersEventHandler
имеет аннотированные функции EventHandler
для хранения и обновления Order
, этот bean-компонент будет зарегистрирован фреймворком как класс, который должен получать события, не требуя какой-либо настройки с нашей стороны.
9. Модель запроса — обработчики запросов
Затем, чтобы запросить эту модель, чтобы, например, получить все заказы, мы должны сначала ввести сообщение Query в наш основной API:
public class FindAllOrderedProductsQuery { }
Во-вторых, нам нужно обновить OrdersEventHandler
, чтобы он мог обрабатывать FindAllOrderedProductsQuery
:
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
Аннотированная функция QueryHandler
будет обрабатывать FindAllOrderedProductsQuery
и настроена на возврат List<Order>
независимо от того, что аналогично любому запросу «найти все».
10. Собираем все вместе
Мы дополнили наш основной API командами, событиями и запросами, а также настроили нашу модель команд и запросов, используя модели OrderAggregate
и Order
.
Далее нужно связать свободные концы нашей инфраструктуры. Поскольку мы используем axon-spring-boot-starter
, это автоматически устанавливает большую часть необходимой конфигурации.
Во- первых, поскольку мы хотим использовать Event Sourcing для нашего Aggregate, нам понадобится EventStore .
Axon Server, который мы запустили на третьем шаге, заполнит эту брешь. ``
Во-вторых, нам нужен механизм для хранения нашей модели запроса Order .
Для этого примера мы можем добавить h2
в качестве базы данных в памяти и spring-boot-starter-data-jpa
для простоты использования:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
10.1. Настройка конечной точки REST
Затем нам нужно получить доступ к нашему приложению, для которого мы будем использовать конечную точку REST, добавив зависимость spring-boot-starter-web :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
С нашей конечной точки REST мы можем начать отправку команд и запросов:
@RestController
public class OrderRestEndpoint {
private final CommandGateway commandGateway;
private final QueryGateway queryGateway;
// Autowiring constructor and POST/GET endpoints
}
CommandGateway используется
как механизм для отправки наших командных сообщений, а QueryGateway
, в свою очередь, отправляет сообщения с запросами. Шлюзы предоставляют более простой и понятный API по сравнению с CommandBus
и QueryBus
, с которыми они соединяются.
С этого момента наша OrderRestEndpoint
должна иметь конечную точку POST для создания, подтверждения и отправки заказа :
@PostMapping("/ship-order")
public CompletableFuture<Void> shipOrder() {
String orderId = UUID.randomUUID().toString();
return commandGateway.send(new CreateOrderCommand(orderId, "Deluxe Chair"))
.thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
.thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId)));
}
Это завершает командную сторону нашего приложения CQRS. Обратите внимание, что шлюз возвращает CompletableFuture, что обеспечивает асинхронность.
Теперь все, что осталось, — это конечная точка GET для запроса всего заказа:
@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
}
В конечной точке GET мы используем QueryGateway
для отправки запроса «точка-точка». При этом мы создаем FindAllOrderedProductsQuery
по умолчанию , но нам также нужно указать ожидаемый тип возвращаемого значения.
Поскольку мы ожидаем , что будет возвращено несколько экземпляров Order
, мы используем статическую функцию ResponseTypes#multipleInstancesOf(Class) .
Таким образом, мы предоставили базовый вход в область запросов нашего сервиса Order.
Мы завершили настройку, поэтому теперь мы можем отправлять некоторые команды и запросы через наш REST-контроллер после запуска приложения OrderApplication.
Отправка POST в конечную точку /ship-order
создаст экземпляр OrderAggregate
, который будет публиковать события, которые, в свою очередь, сохранят/обновят наши заказы.
GET из конечной точки /all-orders
опубликует сообщение запроса, которое будет обработано OrdersEventHandler
, который вернет все существующие заказы.
11. Заключение
В этой статье мы представили Axon Framework как мощную основу для создания приложения, использующего преимущества CQRS и Event Sourcing.
Мы реализовали простой сервис Order с использованием фреймворка, чтобы показать, как такое приложение должно быть структурировано на практике.
Наконец, Axon Server выдавал себя за наше хранилище событий и механизм маршрутизации сообщений, что значительно упрощало инфраструктуру.
Реализацию всех этих примеров и фрагментов кода можно найти на GitHub .
Если у вас возникнут дополнительные вопросы по этой теме, также посетите страницу «Обсудить AxonIQ» .