Перейти к основному содержимому

Руководство по Axon Framework

· 11 мин. чтения

1. Обзор

В этой статье мы рассмотрим Axon и то, как он помогает нам внедрять приложения с учетом CQRS (разделение ответственности за выполнение команд и запросов) и источников событий .

В этом руководстве будут использоваться как Axon Framework, так и Axon Server . Первый будет содержать нашу реализацию, а второй будет нашим специализированным решением для хранилища событий и маршрутизации сообщений.

Пример приложения, которое мы будем создавать, фокусируется на домене Order . Для этого мы будем использовать стандартные блоки CQRS и Event Sourcing, которые Axon предоставляет нам .

Обратите внимание, что многие общие концепции исходят прямо из DDD , что выходит за рамки данной статьи.

2. Зависимости Maven

Мы создадим приложение Axon/Spring Boot. Следовательно, нам нужно добавить последнюю зависимость axon-spring-boot-starter в наш pom.xml , а также зависимость axon-test для тестирования.

Чтобы использовать совпадающие версии, мы будем использовать axon-bom в нашем разделе управления зависимостями:

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-bom</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

3. Аксон-сервер

Мы будем использовать Axon Server в качестве нашего хранилища событий и нашего специального решения для маршрутизации команд, событий и запросов.

В качестве хранилища событий он дает нам идеальные характеристики, необходимые для хранения событий. В этой статье рассказывается, почему это желательно.

В качестве решения для маршрутизации сообщений оно дает нам возможность соединять несколько экземпляров вместе, не сосредотачиваясь на настройке таких вещей, как RabbitMQ или тема Kafka, для обмена и отправки сообщений.

Axon Server можно скачать здесь . Поскольку это простой файл JAR, для его запуска достаточно следующей операции:

java -jar axonserver.jar

Это запустит один экземпляр Axon Server, доступный через localhost:8024 . Конечная точка предоставляет обзор подключенных приложений и сообщений, которые они могут обрабатывать, а также механизм запросов к хранилищу событий, содержащемуся в Axon Server.

Конфигурация Axon Server по умолчанию вместе с зависимостью axon-spring-boot-starter гарантирует, что наша служба Order будет автоматически подключаться к нему.

4. API службы заказа — команды

Мы настроим нашу службу заказов с учетом CQRS. Поэтому мы будем подчеркивать сообщения, которые проходят через наше приложение.

Во-первых, мы определим Команды, то есть выражения намерений. Сервис Order может обрабатывать три разных типа действий:

  1. Создание нового заказа
  2. Подтверждение заказа
  3. Доставка заказа

Естественно, будет три командных сообщения, с которыми может работать наш домен — CreateOrderCommand , ConfirmOrderCommand и ShipOrderCommand :

public class CreateOrderCommand {

@TargetAggregateIdentifier
private final String orderId;
private final String productId;

// constructor, getters, equals/hashCode and toString
}
public class ConfirmOrderCommand {

@TargetAggregateIdentifier
private final String orderId;

// constructor, getters, equals/hashCode and toString
}
public class ShipOrderCommand {

@TargetAggregateIdentifier
private final String orderId;

// constructor, getters, equals/hashCode and toString
}

Аннотация TargetAggregateIdentifier сообщает Axon, что аннотированное поле является идентификатором данного агрегата, на который должна быть нацелена команда. `` Мы кратко коснемся агрегатов позже в этой статье.

Также обратите внимание, что мы пометили поля в командах как окончательные. Это сделано намеренно, так как рекомендуется, чтобы любая реализация сообщения была неизменной .

5. API службы заказа — События

Наш агрегат будет обрабатывать команды , так как он отвечает за принятие решения о том, может ли Заказ быть создан, подтвержден или отправлен.

Он уведомит остальную часть приложения о своем решении, опубликовав событие. У нас будет три типа событий — OrderCreatedEvent, OrderConfirmedEvent и OrderShippedEvent :

public class OrderCreatedEvent {

private final String orderId;
private final String productId;

// default constructor, getters, equals/hashCode and toString
}
public class OrderConfirmedEvent {

private final String orderId;

// default constructor, getters, equals/hashCode and toString
}
public class OrderShippedEvent {

private final String orderId;

// default constructor, getters, equals/hashCode and toString
}

6. Командная модель — совокупность заказов

Теперь, когда мы смоделировали наш основной API с учетом команд и событий, мы можем приступить к созданию модели команд.

Aggregate является обычным компонентом модели команд и происходит от DDD. Другие платформы также используют эту концепцию, как, например, в этой статье о сохранении агрегатов DDD с помощью Spring.

Поскольку наша предметная область сосредоточена на работе с заказами, мы создадим OrderAggregate в качестве центра нашей модели команд.

6.1. Совокупный класс

Итак, давайте создадим наш базовый агрегатный класс:

@Aggregate
public class OrderAggregate {

@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;

@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
AggregateLifecycle.apply(new OrderCreatedEvent(command.getOrderId(), command.getProductId()));
}

@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
orderConfirmed = false;
}

protected OrderAggregate() { }
}

Аннотация Aggregate — это специальная аннотация Axon Spring, помечающая этот класс как агрегат. Он уведомит платформу о том, что для этого OrderAggregate необходимо создать экземпляры необходимых строительных блоков CQRS и Event Sourcing .

Поскольку агрегат будет обрабатывать команды, предназначенные для конкретного экземпляра агрегата, нам необходимо указать идентификатор с аннотацией AggregateIdentifier .

Наш агрегат начнет свой жизненный цикл после обработки команды CreateOrderCommand в « конструкторе обработки команд» OrderAggregate . Чтобы сообщить платформе, что данная функция может обрабатывать команды, мы добавим аннотацию CommandHandler .

При обработке CreateOrderCommand остальная часть приложения будет уведомлена о том, что заказ был создан путем публикации события OrderCreatedEvent. Чтобы опубликовать событие из агрегата, мы будем использовать AggregateLifecycle#apply(Object…) .

С этого момента мы действительно можем начать использовать Event Sourcing в качестве движущей силы для воссоздания агрегированного экземпляра из его потока событий.

Мы начинаем это с «события создания агрегата», OrderCreatedEvent , которое обрабатывается в аннотированной функции EventSourcingHandler для установки состояния orderId и orderConfirmed агрегата Order.

Также обратите внимание, что для получения агрегата на основе его событий Axon требуется конструктор по умолчанию.

6.2. Совокупные обработчики команд

Теперь, когда у нас есть базовый агрегат, мы можем приступить к реализации оставшихся обработчиков команд:

@CommandHandler 
public void handle(ConfirmOrderCommand command) {
if (orderConfirmed) {
return;
}
apply(new OrderConfirmedEvent(orderId));
}

@CommandHandler
public void handle(ShipOrderCommand command) {
if (!orderConfirmed) {
throw new UnconfirmedOrderException();
}
apply(new OrderShippedEvent(orderId));
}

@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
orderConfirmed = true;
}

Сигнатура наших обработчиков команд и источников событий просто указывает handle({the-command}) и on({the-event}) для сохранения краткого формата.

Кроме того, мы определили, что Заказ может быть подтвержден только один раз и отправлен, если он был подтвержден. Таким образом, мы проигнорируем команду в первом случае и создадим исключение UnconfirmedOrderException , если второе не так.

Это иллюстрирует необходимость того, чтобы обработчик источника OrderConfirmedEvent обновлял состояние orderConfirmed до true для агрегата Order.

7. Тестирование модели команд

Во-первых, нам нужно настроить наш тест, создав FixtureConfiguration для OrderAggregate :

private FixtureConfiguration<OrderAggregate> fixture;

@Before
public void setUp() {
fixture = new AggregateTestFixture<>(OrderAggregate.class);
}

Первый тестовый пример должен охватывать простейшую ситуацию. Когда агрегат обрабатывает CreateOrderCommand , он должен создать OrderCreatedEvent :

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.givenNoPriorActivity()
.when(new CreateOrderCommand(orderId, productId))
.expectEvents(new OrderCreatedEvent(orderId, productId));

Затем мы можем протестировать логику принятия решения о возможности отправки Заказа только в том случае, если он был подтвержден. Из-за этого у нас есть два сценария — в одном мы ожидаем исключение, а в другом ожидаем OrderShippedEvent .

Давайте посмотрим на первый сценарий, где мы ожидаем исключение:

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId))
.when(new ShipOrderCommand(orderId))
.expectException(UnconfirmedOrderException.class);

А теперь второй сценарий, где мы ожидаем OrderShippedEvent :

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId), new OrderConfirmedEvent(orderId))
.when(new ShipOrderCommand(orderId))
.expectEvents(new OrderShippedEvent(orderId));

8. Модель запроса — обработчики событий

На данный момент мы установили наш основной API с командами и событиями, и у нас есть командная модель нашей службы заказов CQRS, OrderAggregate .

Затем мы можем начать думать об одной из моделей запросов, которые должно обслуживать наше приложение .

Одной из таких моделей является Орден :

public class Order {

private final String orderId;
private final String productId;
private OrderStatus orderStatus;

public Order(String orderId, String productId) {
this.orderId = orderId;
this.productId = productId;
orderStatus = OrderStatus.CREATED;
}

public void setOrderConfirmed() {
this.orderStatus = OrderStatus.CONFIRMED;
}

public void setOrderShipped() {
this.orderStatus = OrderStatus.SHIPPED;
}

// getters, equals/hashCode and toString functions
}
public enum OrderStatus {
CREATED, CONFIRMED, SHIPPED
}

Мы будем обновлять эту модель на основе событий, распространяющихся через нашу систему. Компонент Spring Service для обновления нашей модели сделает свое дело:

@Service
public class OrdersEventHandler {

private final Map<String, Order> orders = new HashMap<>();

@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId, event.getProductId()));
}

// Event Handlers for OrderConfirmedEvent and OrderShippedEvent...
}

Поскольку мы использовали зависимость axon-spring-boot-starter для запуска нашего приложения Axon, платформа автоматически просканирует все bean-компоненты на наличие существующих функций обработки сообщений.

Поскольку OrdersEventHandler имеет аннотированные функции EventHandler для хранения и обновления Order , этот bean-компонент будет зарегистрирован фреймворком как класс, который должен получать события, не требуя какой-либо настройки с нашей стороны.

9. Модель запроса — обработчики запросов

Затем, чтобы запросить эту модель, чтобы, например, получить все заказы, мы должны сначала ввести сообщение Query в наш основной API:

public class FindAllOrderedProductsQuery { }

Во-вторых, нам нужно обновить OrdersEventHandler , чтобы он мог обрабатывать FindAllOrderedProductsQuery :

@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}

Аннотированная функция QueryHandler будет обрабатывать FindAllOrderedProductsQuery и настроена на возврат List<Order> независимо от того, что аналогично любому запросу «найти все».

10. Собираем все вместе

Мы дополнили наш основной API командами, событиями и запросами, а также настроили нашу модель команд и запросов, используя модели OrderAggregate и Order .

Далее нужно связать свободные концы нашей инфраструктуры. Поскольку мы используем axon-spring-boot-starter , это автоматически устанавливает большую часть необходимой конфигурации.

Во- первых, поскольку мы хотим использовать Event Sourcing для нашего Aggregate, нам понадобится EventStore . Axon Server, который мы запустили на третьем шаге, заполнит эту брешь. ``

Во-вторых, нам нужен механизм для хранения нашей модели запроса Order . Для этого примера мы можем добавить h2 в качестве базы данных в памяти и spring-boot-starter-data-jpa для простоты использования:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>

10.1. Настройка конечной точки REST

Затем нам нужно получить доступ к нашему приложению, для которого мы будем использовать конечную точку REST, добавив зависимость spring-boot-starter-web :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

С нашей конечной точки REST мы можем начать отправку команд и запросов:

@RestController
public class OrderRestEndpoint {

private final CommandGateway commandGateway;
private final QueryGateway queryGateway;

// Autowiring constructor and POST/GET endpoints
}

CommandGateway используется как механизм для отправки наших командных сообщений, а QueryGateway , в свою очередь, отправляет сообщения с запросами. Шлюзы предоставляют более простой и понятный API по сравнению с CommandBus и QueryBus , с которыми они соединяются.

С этого момента наша OrderRestEndpoint должна иметь конечную точку POST для создания, подтверждения и отправки заказа :

@PostMapping("/ship-order")
public CompletableFuture<Void> shipOrder() {
String orderId = UUID.randomUUID().toString();
return commandGateway.send(new CreateOrderCommand(orderId, "Deluxe Chair"))
.thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
.thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId)));
}

Это завершает командную сторону нашего приложения CQRS. Обратите внимание, что шлюз возвращает CompletableFuture, что обеспечивает асинхронность.

Теперь все, что осталось, — это конечная точка GET для запроса всего заказа:

@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
}

В конечной точке GET мы используем QueryGateway для отправки запроса «точка-точка». При этом мы создаем FindAllOrderedProductsQuery по умолчанию , но нам также нужно указать ожидаемый тип возвращаемого значения.

Поскольку мы ожидаем , что будет возвращено несколько экземпляров Order , мы используем статическую функцию ResponseTypes#multipleInstancesOf(Class) . Таким образом, мы предоставили базовый вход в область запросов нашего сервиса Order.

Мы завершили настройку, поэтому теперь мы можем отправлять некоторые команды и запросы через наш REST-контроллер после запуска приложения OrderApplication.

Отправка POST в конечную точку /ship-order создаст экземпляр OrderAggregate , который будет публиковать события, которые, в свою очередь, сохранят/обновят наши заказы. GET из конечной точки /all-orders опубликует сообщение запроса, которое будет обработано OrdersEventHandler , который вернет все существующие заказы.

11. Заключение

В этой статье мы представили Axon Framework как мощную основу для создания приложения, использующего преимущества CQRS и Event Sourcing.

Мы реализовали простой сервис Order с использованием фреймворка, чтобы показать, как такое приложение должно быть структурировано на практике.

Наконец, Axon Server выдавал себя за наше хранилище событий и механизм маршрутизации сообщений, что значительно упрощало инфраструктуру.

Реализацию всех этих примеров и фрагментов кода можно найти на GitHub .

Если у вас возникнут дополнительные вопросы по этой теме, также посетите страницу «Обсудить AxonIQ» .