1. Обзор
Итак, в ряде других руководств мы говорили о BeanPostProcessor
. В этом уроке мы применим их в реальном примере, используя EventBus
Guava .
BeanPostProcessor
Spring дает нам доступ к жизненному циклу компонента Spring для изменения его конфигурации.
BeanPostProcessor
позволяет напрямую модифицировать сами bean-компоненты.
В этом уроке мы рассмотрим конкретный пример этих классов, интегрирующих EventBus
Guava .
2. Настройка
Во-первых, нам нужно настроить нашу среду. Давайте добавим зависимости Spring Context , Spring Expression и Guava в наш pom.xml
:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
Далее, давайте обсудим наши цели.
3. Цели и реализация
Для нашей первой цели мы хотим использовать EventBus
Guava для асинхронной передачи сообщений по различным аспектам системы `` .
Затем мы хотим автоматически регистрировать и отменять регистрацию объектов для событий при создании/уничтожении компонента вместо использования ручного метода, предоставляемого EventBus
.
Итак, теперь мы готовы начать кодирование!
Наша реализация будет состоять из класса-оболочки для EventBus
в Guava , пользовательской аннотации маркера, BeanPostProcessor
, объекта модели и bean-компонента для получения событий биржевой торговли из EventBus
. Кроме того, мы создадим тестовый пример для проверки желаемой функциональности.
3.1. Оболочка EventBus
Для этого мы определим оболочку EventBus
, чтобы предоставить некоторые статические методы для простой регистрации и отмены регистрации bean-компонентов для событий, которые будут использоваться BeanPostProcessor
:
public final class GlobalEventBus {
public static final String GLOBAL_EVENT_BUS_EXPRESSION
= "T(com.foreach.postprocessor.GlobalEventBus).getEventBus()";
private static final String IDENTIFIER = "global-event-bus";
private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());
private GlobalEventBus() {}
public static GlobalEventBus getInstance() {
return GlobalEventBus.GLOBAL_EVENT_BUS;
}
public static EventBus getEventBus() {
return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
}
public static void subscribe(Object obj) {
getEventBus().register(obj);
}
public static void unsubscribe(Object obj) {
getEventBus().unregister(obj);
}
public static void post(Object event) {
getEventBus().post(event);
}
}
Этот код предоставляет статические методы для доступа к GlobalEventBus
и базовой шине EventBus
, а также для регистрации и отмены регистрации событий и публикации событий. У него также есть выражение SpEL, используемое в качестве выражения по умолчанию в нашей пользовательской аннотации, чтобы определить, какой EventBus
мы хотим использовать.
3.2. Пользовательская аннотация маркера
Далее давайте определим пользовательскую аннотацию маркера, которая будет использоваться BeanPostProcessor
для идентификации bean-компонентов для автоматической регистрации/отмены регистрации для событий:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {
String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}
3.3. BeanPostProcessor
Теперь мы определим BeanPostProcessor
, который будет проверять каждый компонент на наличие аннотации подписчика .
Этот класс также является DestructionAwareBeanPostProcessor,
который представляет собой интерфейс Spring, добавляющий обратный вызов перед уничтожением в BeanPostProcessor
. Если аннотация присутствует, мы зарегистрируем ее в EventBus
, идентифицированном выражением SpEL аннотации при инициализации компонента, и отменим ее регистрацию при уничтожении компонента:
public class GuavaEventBusBeanPostProcessor
implements DestructionAwareBeanPostProcessor {
Logger logger = LoggerFactory.getLogger(this.getClass());
SpelExpressionParser expressionParser = new SpelExpressionParser();
@Override
public void postProcessBeforeDestruction(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::unregister, "destruction");
}
@Override
public boolean requiresDestruction(Object bean) {
return true;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::register, "initialization");
return bean;
}
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
// See implementation below
}
}
Приведенный выше код берет каждый bean-компонент и пропускает его через метод процесса
, определенный ниже. Он обрабатывает его после инициализации компонента и до его уничтожения. Метод requireDestruction
возвращает true по умолчанию, и мы сохраняем это поведение здесь, поскольку проверяем наличие аннотации @Subscriber в
обратном вызове postProcessBeforeDestruction
.
Давайте теперь посмотрим на метод процесса
:
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
Object proxy = this.getTargetObject(bean);
Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
if (annotation == null)
return;
this.logger.info("{}: processing bean of type {} during {}",
this.getClass().getSimpleName(), proxy.getClass().getName(), action);
String annotationValue = annotation.value();
try {
Expression expression = this.expressionParser.parseExpression(annotationValue);
Object value = expression.getValue();
if (!(value instanceof EventBus)) {
this.logger.error(
"{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
return;
}
EventBus eventBus = (EventBus)value;
consumer.accept(eventBus, proxy);
} catch (ExpressionException ex) {
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
}
}
Этот код проверяет наличие нашей пользовательской аннотации маркера с именем Subscriber
и, если она присутствует, считывает выражение SpEL из его свойства value .
Затем выражение преобразуется в объект. Если это экземпляр EventBus,
мы применяем к компоненту параметр функции BiConsumer .
BiConsumer используется для регистрации и отмены регистрации bean-компонента в
EventBus
.
Реализация метода getTargetObject
выглядит следующим образом:
private Object getTargetObject(Object proxy) throws BeansException {
if (AopUtils.isJdkDynamicProxy(proxy)) {
try {
return ((Advised)proxy).getTargetSource().getTarget();
} catch (Exception e) {
throw new FatalBeanException("Error getting target of JDK proxy", e);
}
}
return proxy;
}
3.4. Объект модели StockTrade
Далее давайте определим наш объект модели StockTrade
:
public class StockTrade {
private String symbol;
private int quantity;
private double price;
private Date tradeDate;
// constructor
}
3.5. Приемник событий StockTradePublisher
Затем давайте определим класс слушателя, чтобы уведомить нас о получении сделки, чтобы мы могли написать наш тест:
@FunctionalInterface
public interface StockTradeListener {
void stockTradePublished(StockTrade trade);
}
Наконец, мы определим приемник для новых событий StockTrade
:
@Subscriber
public class StockTradePublisher {
Set<StockTradeListener> stockTradeListeners = new HashSet<>();
public void addStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.add(listener);
}
}
public void removeStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.remove(listener);
}
}
@Subscribe
@AllowConcurrentEvents
void handleNewStockTradeEvent(StockTrade trade) {
// publish to DB, send to PubNub, ...
Set<StockTradeListener> listeners;
synchronized (this.stockTradeListeners) {
listeners = new HashSet<>(this.stockTradeListeners);
}
listeners.forEach(li -> li.stockTradePublished(trade));
}
}
Приведенный выше код помечает этот класс как подписчик
событий Guava EventBus , а аннотация Guava
@Subscribe
помечает метод handleNewStockTradeEvent
как получатель событий. Тип получаемых событий зависит от класса единственного параметра метода; в этом случае мы будем получать события типа StockTrade
.
Аннотация @AllowConcurrentEvents
разрешает одновременный вызов этого метода. Как только мы получаем сделку, мы выполняем любую обработку, которую хотим, а затем уведомляем всех слушателей.
3.6. Тестирование
Теперь давайте завершим наш код интеграционным тестом, чтобы убедиться, что BeanPostProcessor
работает правильно. Во-первых, нам понадобится контекст Spring:
@Configuration
public class PostProcessorConfiguration {
@Bean
public GlobalEventBus eventBus() {
return GlobalEventBus.getInstance();
}
@Bean
public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
return new GuavaEventBusBeanPostProcessor();
}
@Bean
public StockTradePublisher stockTradePublisher() {
return new StockTradePublisher();
}
}
Теперь мы можем реализовать наш тест:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PostProcessorConfiguration.class)
public class StockTradeIntegrationTest {
@Autowired
StockTradePublisher stockTradePublisher;
@Test
public void givenValidConfig_whenTradePublished_thenTradeReceived() {
Date tradeDate = new Date();
StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
AtomicBoolean assertionsPassed = new AtomicBoolean(false);
StockTradeListener listener = trade -> assertionsPassed
.set(this.verifyExact(stockTrade, trade));
this.stockTradePublisher.addStockTradeListener(listener);
try {
GlobalEventBus.post(stockTrade);
await().atMost(Duration.ofSeconds(2L))
.untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
} finally {
this.stockTradePublisher.removeStockTradeListener(listener);
}
}
boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
&& Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
&& stockTrade.getQuantity() == trade.getQuantity()
&& stockTrade.getPrice() == trade.getPrice();
}
}
Приведенный выше тестовый код генерирует биржевую сделку и отправляет ее в GlobalEventBus
. Мы ждем не более двух секунд, пока действие завершится и нас уведомит о том, что сделка получена stockTradePublisher
. Кроме того, мы подтверждаем, что полученная сделка не была изменена в пути.
4. Вывод
В заключение, Spring BeanPostProcessor
позволяет нам настраивать сами bean-компоненты , предоставляя нам средства для автоматизации действий bean-компонентов, которые в противном случае нам пришлось бы выполнять вручную.
Как всегда, исходный код доступен на GitHub .