Перейти к основному содержимому

Механизм обратного давления в Spring WebFlux

· 7 мин. чтения

1. Введение

Spring WebFlux обеспечивает реактивное программирование для веб-приложений. Асинхронный и неблокирующий характер реактивного дизайна повышает производительность и использование памяти. Project Reactor предоставляет эти возможности для эффективного управления потоками данных.

Однако противодавление является распространенной проблемой в таких приложениях. В этом руководстве мы объясним, что это такое и как применить механизм обратного давления в Spring WebFlux, чтобы смягчить его.

2. Противодавление в реактивных потоках

Из-за неблокирующего характера реактивного программирования сервер не отправляет весь поток сразу. Он может передавать данные одновременно, как только они станут доступны. Таким образом, клиент ожидает меньше времени, чтобы получить и обработать события. Но есть проблемы, которые нужно преодолеть.

Противодавление в программных комплексах – это возможность перегрузить коммуникацию трафиком . Другими словами, источники информации заваливают потребителей данными, которые они не в состоянии обработать.

В конце концов, люди также применяют этот термин как механизм контроля и управления им. Это защитные действия, предпринимаемые системами для контроля сил, расположенных ниже по течению.

2.1. Что такое противодавление?

В Reactive Streams противодавление также определяет, как регулировать передачу элементов потока . Другими словами, контролируйте, сколько элементов может потреблять получатель.

Давайте используем пример, чтобы ясно описать, что это такое:

  • Система содержит три службы: издатель, потребитель и графический интерфейс пользователя (GUI).
  • Издатель отправляет 10000 событий в секунду Потребителю.
  • Потребитель обрабатывает их и отправляет результат в графический интерфейс.
  • Графический интерфейс отображает результаты для пользователей
  • Потребитель может обрабатывать только 7500 событий в секунду.

./242a1dbb1ec0197da4ea8a3880bcee72.png

При такой скорости потребитель не может управлять событиями ( противодавление) . Следовательно, система рухнет, и пользователи не увидят результатов.

2.2. Использование противодавления для предотвращения системных сбоев

Здесь рекомендуется применить какую-то стратегию обратного давления для предотвращения системных сбоев. Цель состоит в том, чтобы эффективно управлять полученными дополнительными событиями:

  • Управление отправляемым потоком данных будет первым вариантом . По сути, издателю нужно замедлить темп событий. Таким образом, потребитель не перегружен. К сожалению, это не всегда возможно, и нам нужно будет найти другие доступные варианты.
  • Буферизация дополнительного объема данных — второй вариант . При таком подходе потребитель временно сохраняет оставшиеся события, пока не сможет их обработать. Основным недостатком здесь является отвязка буфера, вызывающая сбой памяти.
  • Отбрасывая лишние события, теряя их из виду . Даже это решение далеко от идеального, с таким приемом система не рухнет

./ae910f25cae4cc57d8d7e8c46326acd8.png

2.3. Контроль противодавления

Мы сосредоточимся на управлении событиями, создаваемыми издателем. По сути, есть три стратегии, которым нужно следовать:

  • Отправляйте новые события только тогда, когда их запрашивает подписчик . Это стратегия извлечения для сбора элементов по запросу эмиттера.
  • Ограничение количества событий , получаемых на стороне клиента . Работая в качестве ограниченной стратегии push-уведомлений, издатель может отправить клиенту только максимальное количество элементов за раз.
  • Отмена потоковой передачи данных, когда потребитель не может обработать больше событий . В этом случае получатель может прервать передачу в любой момент и позже снова подписаться на поток.

./cd255e65b0e68113a13d00c32aed6b7e.png

3. Обработка обратного давления в Spring WebFlux

Spring WebFlux обеспечивает асинхронный неблокирующий поток реактивных потоков . Ответственным за противодавление в Spring WebFlux является Project Reactor . Он внутренне использует функциональные возможности Flux для применения механизмов управления событиями, создаваемыми эмиттером.

WebFlux использует управление потоком TCP для регулирования обратного давления в байтах. Но он не обрабатывает логические элементы, которые может получить потребитель. Давайте посмотрим на поток взаимодействия, происходящий под капотом:

  • Фреймворк WebFlux отвечает за преобразование событий в байты для их передачи/получения через TCP.
  • Может случиться так, что потребитель запускает и долго выполняющееся задание до запроса следующего логического элемента.
  • Пока получатель обрабатывает события, WebFlux ставит байты в очередь без подтверждения, потому что нет спроса на новые события.
  • Из-за природы протокола TCP, если есть новые события, издатель продолжит отправлять их в сеть.

./662c1e13a0e09946e89d1cd978c23a1b.png

В заключение, приведенная выше диаграмма показывает, что спрос на логические элементы может быть разным для потребителя и издателя. Spring WebFlux не идеально справляется с противодействием между сервисами, взаимодействующими как единая система. Он обрабатывает это с потребителем независимо, а затем с издателем таким же образом. Но это без учета логического спроса между двумя сервисами.

Таким образом, Spring WebFlux не справляется с противодавлением, как можно было бы ожидать . Давайте посмотрим в следующем разделе, как реализовать механизм обратного давления в Spring WebFlux!

4. Реализация механизма обратного давления с помощью Spring WebFlux

Мы будем использовать реализацию Flux для управления полученными событиями. Поэтому мы предоставим тело запроса и ответа с поддержкой обратного давления на стороне чтения и записи. Затем производитель замедлится или остановится, пока не освободится мощность потребителя. Давайте посмотрим, как это сделать!

4.1. Зависимости

Чтобы реализовать примеры, мы просто добавим тестовые зависимости Spring WebFlux starter и Reactor в наш pom.xml :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

4.2. Запрос

Первый вариант — предоставить потребителю контроль над событиями, которые он может обрабатывать . Таким образом, издатель ждет, пока получатель не запросит новые события. Таким образом, клиент подписывается на Flux , а затем обрабатывает события в соответствии со своим запросом:

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);

request.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("All 50 items have been successfully processed!!!"),
subscription -> {
for (int i = 0; i < 5; i++) {
System.out.println("Requesting the next 10 elements!!!");
subscription.request(10);
}
}
);

StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.thenRequest(10)
.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.thenRequest(10)
.expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
.thenRequest(10)
.expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
.thenRequest(10)
.expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
.verifyComplete();

При таком подходе излучатель никогда не подавляет приемник. Другими словами, клиент находится под контролем для обработки необходимых ему событий.

Мы проверим поведение производителя в отношении обратного давления с помощью StepVerifier . Мы ожидаем следующие n элементов только при вызове thenRequest(n) .

4.3. Ограничение

Второй вариант — использовать оператор limitRange() из Project Reactor. Это позволяет установить количество элементов для предварительной выборки за один раз . Одна интересная особенность заключается в том, что ограничение применяется, даже если подписчик запрашивает больше событий для обработки . Эмиттер разбивает события на куски, избегая потребления больше, чем лимит на каждый запрос:

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
Flux<Integer> limit = Flux.range(1, 25);

limit.limitRate(10);
limit.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("Finished!!"),
subscription -> subscription.request(15)
);

StepVerifier.create(limit)
.expectSubscription()
.thenRequest(15)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectNext(11, 12, 13, 14, 15)
.thenRequest(10)
.expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
.verifyComplete();
}

4.4. Отмена

Наконец, потребитель может отменить события для получения в любой момент . В этом примере мы будем использовать другой подход. Project Reactor позволяет реализовать собственный подписчик или расширить BaseSubscriber . Итак, давайте посмотрим, как получатель может в любой момент прервать прием новых событий, переопределив упомянутый класс:

@Test
public void whenCancel_thenSubscriptionFinished() {
Flux<Integer> cancel = Flux.range(1, 10).log();

cancel.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});

StepVerifier.create(cancel)
.expectNext(1, 2, 3)
.thenCancel()
.verify();
}

5. Вывод

В этом уроке мы показали, что такое обратное давление в реактивном программировании и как его избежать. Spring WebFlux поддерживает обратное давление через Project Reactor. Следовательно, он может обеспечить доступность, надежность и стабильность, когда издатель перегружает потребителя слишком большим количеством событий. Таким образом, это может предотвратить системные сбои из-за высокого спроса.

Как всегда, код доступен на GitHub .