1. Обзор
В этом руководстве представлены операторы map
и flatMap
в Project Reactor . Они определены в классах Mono
и Flux
для преобразования элементов при обработке потока.
В следующих разделах мы сосредоточимся на методах map
и flatMap
в классе Flux
. Одноименные в классе Mono
работают точно так же.
2. Зависимости Maven
Чтобы написать несколько примеров кода, нам понадобится зависимость ядра Reactor :
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.9.RELEASE</version>
</dependency>
3. Оператор карты
Теперь давайте посмотрим, как мы можем использовать оператор карты .
Метод Flux#map
ожидает один параметр Function
, который может быть таким простым, как:
Function<String, String> mapper = String::toUpperCase;
Этот преобразователь преобразует строку в ее версию в верхнем регистре. Мы можем применить его к потоку Flux :
Flux<String> inFlux = Flux.just("foreach", ".", "com");
Flux<String> outFlux = inFlux.map(mapper);
Данный преобразователь преобразует каждый элемент входного потока в новый элемент выходного, сохраняя порядок .
Докажем, что:
StepVerifier.create(outFlux)
.expectNext("FOREACH", ".", "COM")
.expectComplete()
.verify();
Обратите внимание, что функция картографа не выполняется при вызове метода карты .
Вместо этого он запускается в то время, когда мы подписываемся на поток .
4. Оператор плоской карты
Пришло время перейти к оператору flatMap
.
4.1. Пример кода
Подобно карте
, оператор flatMap
имеет единственный параметр типа Function
. Однако, в отличие от функции, работающей с map
, функция сопоставления flatMap
преобразует входной элемент в Publisher
, а не в обычный объект.
Вот пример:
Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split(""));
В этом случае функция сопоставления преобразует строку в ее версию в верхнем регистре, а затем разбивает ее на отдельные символы. Наконец, функция создает новый поток из этих символов.
Теперь мы можем передать данный преобразователь в метод flatMap
:
Flux<String> inFlux = Flux.just("foreach", ".", "com");
Flux<String> outFlux = inFlux.flatMap(mapper);
Операция плоского отображения, которую мы видели, создает три новых потока из восходящего потока с тремя строковыми элементами. После этого элементы из этих трех потоков разделяются и переплетаются, образуя еще один новый поток. Этот последний поток содержит символы из всех трех входных строк.
Затем мы можем подписаться на этот вновь сформированный поток, чтобы запустить конвейер и проверить вывод:
List<String> output = new ArrayList<>();
outFlux.subscribe(output::add);
assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M");
Обратите внимание, что из-за чередования элементов из разных источников их порядок на выходе может отличаться от того, что мы видим на входе .
4.2. Объяснение операций конвейера
Мы только что определили преобразователь, передали его оператору flatMap
и вызвали этот оператор в потоке. Пришло время углубиться в детали и понять, почему элементы в выходных данных могут быть не в порядке.
Во-первых, давайте проясним, что никакие операции не выполняются до тех пор, пока поток не будет подписан . Когда это происходит, конвейер выполняет и вызывает функцию сопоставления, переданную методу flatMap
.
В этот момент преобразователь выполняет необходимое преобразование элементов во входном потоке. Каждый из этих элементов может быть преобразован в несколько элементов, которые затем используются для создания нового потока . В нашем примере кода значение выражения
Flux.just(s.toUpperCase().split(""))
указывает на такой поток.
Как только новый поток, представленный экземпляром Publisher
, будет готов, flatMap
охотно подпишется. Оператор не ждет завершения работы издателя, прежде чем перейти к следующему потоку , что означает, что подписка не блокируется.
Поскольку конвейер обрабатывает все производные потоки одновременно, их элементы могут поступить в любой момент. В результате первоначальный порядок теряется. Если важен порядок элементов, рассмотрите возможность использования оператора flatMapSequential
.
5. Различия между картой
и плоской картой
До сих пор мы рассмотрели операторы map
и flatMap
. Давайте закончим с основными различиями между ними.
5.1. Один к одному против одного ко многим
Оператор карты
применяет преобразование «один к одному» к элементам потока, в то время как flatMap
выполняет преобразование «один ко многим» . Это различие становится очевидным при взгляде на сигнатуру метода:
<V> Flux<V> map(Function<? super T, ? extends V> mapper)
— преобразователь преобразует одно значение типаT
в одно значение типаV.
Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
— преобразователь преобразует одно значение типаT
вPublisher
элементов типаR
Мы видим, что с точки зрения функциональности разница между map
и flatMap
в Project Reactor аналогична разнице между map
и flatMap
в Java Stream API.
5.2. Синхронный против асинхронного
Вот две выдержки из спецификации API для библиотеки Reactor Core:
map
: преобразуйте элементы, испускаемые этимпотоком
, применяя синхронную функцию к каждому элементу.flatMap
: асинхронно преобразовать элементы, испускаемые этимпотоком
, виздателей .
Легко видеть, что map
— это синхронный оператор — это просто метод, который преобразует одно значение в другое. Этот метод выполняется в том же потоке, что и вызывающий.
Другое утверждение — flatMap
асинхронно — не так ясно. На самом деле преобразование элементов в Publishers
может быть как синхронным, так и асинхронным.
В нашем примере кода эта операция является синхронной, поскольку мы испускаем элементы с помощью метода Flux#just .
Однако при работе с источником с высокой задержкой, например с удаленным сервером, асинхронная обработка является лучшим вариантом .
Важным моментом является то, что пайплайну все равно, из каких потоков приходят элементы — он просто обращает внимание на самих издателей.
6. Заключение
В этой статье мы рассмотрели операторы map
и flatMap
в Project Reactor. Мы обсудили пару примеров и разъяснили процесс.
Как обычно, исходный код нашего приложения доступен на GitHub .