Перейти к основному содержимому

Шаблон корпоративной интеграции Wire Tap

· 4 мин. чтения

1. Обзор

В этом руководстве мы рассмотрим шаблон корпоративной интеграции Wire Tap (EIP), который помогает нам отслеживать сообщения, проходящие через систему.

Этот шаблон позволяет нам перехватывать сообщения, не потребляя их постоянно вне канала .

2. Схема отвода проводов

Wire Tap проверяет сообщения, которые передаются по каналу «точка-точка» . Он получает сообщение, делает копию и отправляет ее в пункт назначения Tap:

./3529b57c445c964b166c5826c030cf2c.png

Чтобы лучше понять это, давайте создадим приложение Spring Boot с ActiveMQ и Camel .

3. Зависимости Maven

Давайте добавим camel-spring-boot-dependencies :

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>${camel.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Теперь добавим camel-spring-boot-starter :

<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>

Чтобы просмотреть сообщения, проходящие по маршруту, нам также потребуется включить ActiveMQ :

<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-activemq-starter</artifactId>
</dependency>

4. Обмен сообщениями

Создадим объект сообщения:

public class MyPayload implements Serializable {
private String value;
...
}

Мы отправим это сообщение `в direct:source` , чтобы инициировать маршрут:

try (CamelContext context = new DefaultCamelContext()) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connectionFactory.setTrustAllPackages(true);
context.addComponent("direct", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
addRoute(context);

try (ProducerTemplate template = context.createProducerTemplate()) {
context.start();

MyPayload payload = new MyPayload("One");
template.sendBody("direct:source", payload);
Thread.sleep(10000);
} finally {
context.stop();
}
}

Далее мы добавим маршрут и нажмем пункт назначения.

5. Использование биржи

Мы будем использовать метод wireTap , чтобы установить URI конечной точки пункта назначения Tap. Camel не ждет ответа от wireTap, потому что он устанавливает шаблон обмена сообщениями на InOnly . Процессор Wire Tap обрабатывает его в отдельном потоке :

wireTap("direct:tap").delay(1000)

Узел Camel Wire Tap поддерживает два варианта подключения к обмену:

5.1. Традиционный проволочный кран

Давайте добавим традиционный маршрут Wire Tap:

RoutesBuilder traditionalWireTapRoute() {
return new RouteBuilder() {
public void configure() {

from("direct:source").wireTap("direct:tap")
.delay(1000)
.bean(MyBean.class, "addTwo")
.to("direct:destination");

from("direct:tap").log("Tap Wire route: received");

from("direct:destination").log("Output at destination: '${body}'");
}
};
}

Здесь Camel только скопирует Exchange , но не сделает глубокого клонирования . Все копии могут делиться объектами из исходного обмена.

При одновременной обработке нескольких сообщений существует вероятность повреждения окончательной полезной нагрузки . Мы можем создать глубокий клон полезной нагрузки перед передачей ее в пункт назначения Tap, чтобы предотвратить это. ****

5.2. Отправка нового обмена

EIP Wire Tap поддерживает Expression или Processor , предварительно заполненные копией обмена . Выражение можно использовать только для установки тела сообщения.

Вариант « Процессор » дает полную власть над тем, как заполняется обмен (настройка свойств, заголовков и т. д.).

Давайте реализуем глубокое клонирование в полезной нагрузке :

public class MyPayload implements Serializable {

private String value;
...
public MyPayload deepClone() {
MyPayload myPayload = new MyPayload(value);
return myPayload;
}
}

Теперь давайте реализуем класс Processor с копией исходного обмена в качестве входных данных:

public class MyPayloadClonePrepare implements Processor {

public void process(Exchange exchange) throws Exception {
MyPayload myPayload = exchange.getIn().getBody(MyPayload.class);
exchange.getIn().setBody(myPayload.deepClone());
exchange.getIn().setHeader("date", new Date());
}
}

Мы будем вызывать его с помощью onPrepare сразу после wireTap :

RoutesBuilder newExchangeRoute() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {

from("direct:source").wireTap("direct:tap")
.onPrepare(new MyPayloadClonePrepare())
.end()
.delay(1000);

from("direct:tap").bean(MyBean.class, "addThree");
}
};
}

6. Заключение

В этой статье мы реализовали шаблон Wire Tap для отслеживания сообщений, проходящих через определенные конечные точки сообщений. Используя wireTap Apache Camel , мы копируем сообщение и отправляем его в другую конечную точку, не изменяя существующий поток.

Camel поддерживает два способа подключения к обмену. В традиционном Wire Tap исходный обмен копируется. Во втором мы можем создать новую биржу. Мы можем заполнить этот новый обмен новыми значениями тела сообщения, используя Expression , или мы можем установить заголовки — и, возможно, тело — с помощью Processor .

Пример кода доступен на GitHub .