1. Обзор
В этой быстрой статье мы познакомим вас с реакторной шиной , настроив реальный сценарий для реактивного, управляемого событиями приложения.
ПРИМЕЧАНИЕ. Проект реактора-шины был удален в Reactor 3.x: заархивированный репозиторий реактора-шины .
2. Основы Project Reactor
2.1. Почему Реактор?
Современным приложениям необходимо иметь дело с огромным количеством одновременных запросов и обрабатывать значительный объем данных. Стандартного блокирующего кода уже недостаточно для выполнения этих требований.
Шаблон реактивного проектирования — это архитектурный подход, основанный на событиях, для асинхронной обработки большого объема одновременных запросов на обслуживание, поступающих от одного или нескольких обработчиков служб.
Project Reactor основан на этом шаблоне и имеет четкую и амбициозную цель создания неблокирующих реактивных приложений на JVM .
2.2. Примеры сценариев
Прежде чем мы начнем, вот несколько интересных сценариев, в которых использование реактивного архитектурного стиля имеет смысл, просто чтобы понять, где мы можем его применить:
- Службы уведомлений для крупной платформы онлайн-покупок, такой как Amazon.
- Огромные услуги по обработке транзакций для банковского сектора
- Компании по торговле акциями, где цены на акции меняются одновременно
3. Зависимости Maven
Давайте начнем использовать Project Reactor Bus, добавив следующую зависимость в наш pom.xml:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
Мы можем проверить последнюю версию реактора-шины
в Maven Central .
4. Создание демонстрационного приложения
Чтобы лучше понять преимущества подхода на основе реактора, давайте рассмотрим практический пример.
Мы создадим простое приложение, отвечающее за отправку уведомлений пользователям платформы онлайн-покупок. Например, если пользователь размещает новый заказ, приложение отправляет подтверждение заказа по электронной почте или SMS.
Типичная синхронная реализация, естественно, будет ограничена пропускной способностью службы электронной почты или SMS. Поэтому всплески трафика, такие как праздники, как правило, будут проблематичными.
Используя реактивный подход, мы можем спроектировать нашу систему так, чтобы она была более гибкой и лучше адаптировалась к сбоям или тайм-аутам, которые могут возникнуть во внешних системах, таких как серверы шлюзов.
Давайте посмотрим на приложение — начиная с более традиционных аспектов и переходя к более реактивным конструкциям.
4.1. Простой POJO
Во-первых, давайте создадим класс POJO для представления данных уведомления:
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
// getter and setter methods
}
4.2. Сервисный уровень
Давайте теперь определим простой сервисный слой:
public interface NotificationService {
void initiateNotification(NotificationData notificationData)
throws InterruptedException;
}
И реализация, имитирующая длительную операцию:
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotification(NotificationData notificationData)
throws InterruptedException {
System.out.println("Notification service started for "
+ "Notification ID: " + notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for "
+ "Notification ID: " + notificationData.getId());
}
}
Обратите внимание: чтобы проиллюстрировать реальный сценарий отправки сообщений через шлюз SMS или электронной почты, мы намеренно вводим пятисекундную задержку в методе InitialNotification с помощью Thread.sleep
(5000).
Следовательно, когда поток достигает службы, он будет заблокирован на пять секунд.
4.3. Потребитель
Давайте теперь перейдем к более реактивным аспектам нашего приложения и реализуем потребителя, который затем сопоставим с шиной событий реактора:
@Service
public class NotificationConsumer implements
Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotification(notificationData);
} catch (InterruptedException e) {
// ignore
}
}
}
Как мы видим, созданный нами потребитель реализует интерфейс Consumer<T> .
Основная логика находится в методе accept .
Это аналогичный подход, который мы можем встретить в типичной реализации прослушивателя Spring .
4.4. Контроллер
Наконец, теперь, когда мы можем потреблять события, давайте также сгенерируем их.
Мы собираемся сделать это в простом контроллере:
@Controller
public class NotificationController {
@Autowired
private EventBus eventBus;
@GetMapping("/startNotification/{param}")
public void startNotification(@PathVariable Integer param) {
for (int i = 0; i < param; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer", Event.wrap(data));
System.out.println(
"Notification " + i + ": notification task submitted successfully");
}
}
}
Это говорит само за себя — здесь мы посылаем события через EventBus
.
Например, если клиент переходит по URL-адресу со значением параметра десять, то десять событий будут отправлены через шину событий.
4.5. Конфигурация Java
Давайте теперь соберем все вместе и создадим простое приложение Spring Boot .
Во- первых, нам нужно настроить компоненты EventBus
и Environment
:
@Configuration
public class Config {
@Bean
public Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
}
В нашем случае мы создаем экземпляр EventBus
с пулом потоков по умолчанию, доступным в среде .
В качестве альтернативы мы можем использовать настроенный экземпляр Dispatcher :
EventBus evBus = EventBus.create(
env,
Environment.newDispatcher(
REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,
DispatcherType.THREAD_POOL_EXECUTOR));
Теперь мы готовы создать основной код приложения:
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}
В нашем методе запуска
мы регистрируем уведомлениеConsumer
, которое будет запускаться, когда уведомление соответствует заданному селектору .
Обратите внимание, как мы используем статический импорт атрибута $
для создания объекта Selector .
5. Протестируйте приложение
Давайте теперь создадим тест, чтобы увидеть наше приложение NotificationApplication
в действии:
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {
@LocalServerPort
private int port;
@Test
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
}
}
Как мы видим, как только запрос выполняется, все десять задач отправляются мгновенно, без каких-либо блокировок . И после отправки события уведомлений обрабатываются параллельно.
Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9
Важно иметь в виду, что в нашем сценарии нет необходимости обрабатывать эти события в каком-либо определенном порядке.
6. Заключение
В этом кратком руководстве мы создали простое приложение, управляемое событиями . Мы также увидели, как начать писать более реактивный и неблокирующий код.
Однако этот сценарий лишь поверхностно затрагивает тему и представляет собой просто хорошую основу для начала экспериментов с реактивной парадигмой .
Как всегда, исходный код доступен на GitHub .