1. Введение
Spring WebFlux обеспечивает реактивное программирование для веб-приложений. Асинхронный и неблокирующий характер реактивного дизайна повышает производительность и использование памяти. Project Reactor предоставляет эти возможности для эффективного управления потоками данных.
Однако противодавление является распространенной проблемой в таких приложениях. В этом руководстве мы объясним, что это такое и как применить механизм обратного давления в Spring WebFlux, чтобы смягчить его.
2. Противодавление в реактивных потоках
Из-за неблокирующего характера реактивного программирования сервер не отправляет весь поток сразу. Он может передавать данные одновременно, как только они станут доступны. Таким образом, клиент ожидает меньше времени, чтобы получить и обработать события. Но есть проблемы, которые нужно преодолеть.
Противодавление в программных комплексах – это возможность перегрузить коммуникацию трафиком . Другими словами, источники информации заваливают потребителей данными, которые они не в состоянии обработать.
В конце концов, люди также применяют этот термин как механизм контроля и управления им. Это защитные действия, предпринимаемые системами для контроля сил, расположенных ниже по течению.
2.1. Что такое противодавление?
В Reactive Streams противодавление также определяет, как регулировать передачу элементов потока . Другими словами, контролируйте, сколько элементов может потреблять получатель.
Давайте используем пример, чтобы ясно описать, что это такое:
- Система содержит три службы: издатель, потребитель и графический интерфейс пользователя (GUI).
- Издатель отправляет 10000 событий в секунду Потребителю.
- Потребитель обрабатывает их и отправляет результат в графический интерфейс.
- Графический интерфейс отображает результаты для пользователей
- Потребитель может обрабатывать только 7500 событий в секунду.
При такой скорости потребитель не может управлять событиями ( противодавление) . Следовательно, система рухнет, и пользователи не увидят результатов.
2.2. Использование противодавления для предотвращения системных сбоев
Здесь рекомендуется применить какую-то стратегию обратного давления для предотвращения системных сбоев. Цель состоит в том, чтобы эффективно управлять полученными дополнительными событиями:
- Управление отправляемым потоком данных будет первым вариантом . По сути, издателю нужно замедлить темп событий. Таким образом, потребитель не перегружен. К сожалению, это не всегда возможно, и нам нужно будет найти другие доступные варианты.
- Буферизация дополнительного объема данных — второй вариант . При таком подходе потребитель временно сохраняет оставшиеся события, пока не сможет их обработать. Основным недостатком здесь является отвязка буфера, вызывающая сбой памяти.
- Отбрасывая лишние события, теряя их из виду . Даже это решение далеко от идеального, с таким приемом система не рухнет
2.3. Контроль противодавления
Мы сосредоточимся на управлении событиями, создаваемыми издателем. По сути, есть три стратегии, которым нужно следовать:
- Отправляйте новые события только тогда, когда их запрашивает подписчик . Это стратегия извлечения для сбора элементов по запросу эмиттера.
- Ограничение количества событий , получаемых на стороне клиента . Работая в качестве ограниченной стратегии push-уведомлений, издатель может отправить клиенту только максимальное количество элементов за раз.
- Отмена потоковой передачи данных, когда потребитель не может обработать больше событий . В этом случае получатель может прервать передачу в любой момент и позже снова подписаться на поток.
3. Обработка обратного давления в Spring WebFlux
Spring WebFlux обеспечивает асинхронный неблокирующий поток реактивных потоков . Ответственным за противодавление в Spring WebFlux является Project Reactor . Он внутренне использует функциональные возможности Flux для применения механизмов управления событиями, создаваемыми эмиттером.
WebFlux использует управление потоком TCP для регулирования обратного давления в байтах. Но он не обрабатывает логические элементы, которые может получить потребитель. Давайте посмотрим на поток взаимодействия, происходящий под капотом:
- Фреймворк WebFlux отвечает за преобразование событий в байты для их передачи/получения через TCP.
- Может случиться так, что потребитель запускает и долго выполняющееся задание до запроса следующего логического элемента.
- Пока получатель обрабатывает события, WebFlux ставит байты в очередь без подтверждения, потому что нет спроса на новые события.
- Из-за природы протокола TCP, если есть новые события, издатель продолжит отправлять их в сеть.
В заключение, приведенная выше диаграмма показывает, что спрос на логические элементы может быть разным для потребителя и издателя. Spring WebFlux не идеально справляется с противодействием между сервисами, взаимодействующими как единая система. Он обрабатывает это с потребителем независимо, а затем с издателем таким же образом. Но это без учета логического спроса между двумя сервисами.
Таким образом, Spring WebFlux не справляется с противодавлением, как можно было бы ожидать . Давайте посмотрим в следующем разделе, как реализовать механизм обратного давления в Spring WebFlux!
4. Реализация механизма обратного давления с помощью Spring WebFlux
Мы будем использовать реализацию Flux
для управления полученными событиями. Поэтому мы предоставим тело запроса и ответа с поддержкой обратного давления на стороне чтения и записи. Затем производитель замедлится или остановится, пока не освободится мощность потребителя. Давайте посмотрим, как это сделать!
4.1. Зависимости
Чтобы реализовать примеры, мы просто добавим тестовые зависимости Spring WebFlux starter и Reactor в наш pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
4.2. Запрос
Первый вариант — предоставить потребителю контроль над событиями, которые он может обрабатывать . Таким образом, издатель ждет, пока получатель не запросит новые события. Таким образом, клиент подписывается на Flux
, а затем обрабатывает события в соответствии со своим запросом:
@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);
request.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("All 50 items have been successfully processed!!!"),
subscription -> {
for (int i = 0; i < 5; i++) {
System.out.println("Requesting the next 10 elements!!!");
subscription.request(10);
}
}
);
StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.thenRequest(10)
.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.thenRequest(10)
.expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
.thenRequest(10)
.expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
.thenRequest(10)
.expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
.verifyComplete();
При таком подходе излучатель никогда не подавляет приемник. Другими словами, клиент находится под контролем для обработки необходимых ему событий.
Мы проверим поведение производителя в отношении обратного давления с помощью StepVerifier
. Мы ожидаем следующие n элементов только при вызове thenRequest(n)
.
4.3. Ограничение
Второй вариант — использовать оператор limitRange()
из Project Reactor. Это позволяет установить количество элементов для предварительной выборки за один раз . Одна интересная особенность заключается в том, что ограничение применяется, даже если подписчик запрашивает больше событий для обработки . Эмиттер разбивает события на куски, избегая потребления больше, чем лимит на каждый запрос:
@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
Flux<Integer> limit = Flux.range(1, 25);
limit.limitRate(10);
limit.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("Finished!!"),
subscription -> subscription.request(15)
);
StepVerifier.create(limit)
.expectSubscription()
.thenRequest(15)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectNext(11, 12, 13, 14, 15)
.thenRequest(10)
.expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
.verifyComplete();
}
4.4. Отмена
Наконец, потребитель может отменить события для получения в любой момент . В этом примере мы будем использовать другой подход. Project Reactor позволяет реализовать собственный подписчик
или расширить BaseSubscriber
. Итак, давайте посмотрим, как получатель может в любой момент прервать прием новых событий, переопределив упомянутый класс:
@Test
public void whenCancel_thenSubscriptionFinished() {
Flux<Integer> cancel = Flux.range(1, 10).log();
cancel.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});
StepVerifier.create(cancel)
.expectNext(1, 2, 3)
.thenCancel()
.verify();
}
5. Вывод
В этом уроке мы показали, что такое обратное давление в реактивном программировании и как его избежать. Spring WebFlux поддерживает обратное давление через Project Reactor. Следовательно, он может обеспечить доступность, надежность и стабильность, когда издатель перегружает потребителя слишком большим количеством событий. Таким образом, это может предотвратить системные сбои из-за высокого спроса.
Как всегда, код доступен на GitHub .