1. Введение
Это руководство представляет собой руководство по функциональным возможностям и вариантам использования класса CompletableFuture
, который был представлен как улучшение API параллелизма Java 8.
2. Асинхронные вычисления в Java
Трудно рассуждать об асинхронных вычислениях. Обычно мы хотим думать о любом вычислении как о серии шагов, но в случае асинхронного вычисления действия, представленные в виде обратных вызовов, как правило, либо разбросаны по коду, либо глубоко вложены друг в друга . Все становится еще хуже, когда нам нужно обработать ошибки, которые могут возникнуть на одном из шагов.
Интерфейс Future
был добавлен в Java 5 как результат асинхронных вычислений, но в нем не было методов для объединения этих вычислений или обработки возможных ошибок.
В Java 8 появился класс CompletableFuture .
Наряду с интерфейсом Future
он также реализовал интерфейс CompletionStage .
Этот интерфейс определяет контракт для шага асинхронных вычислений, который мы можем комбинировать с другими шагами.
CompletableFuture
— это одновременно строительный блок и фреймворк с примерно 50 различными методами для составления, объединения и выполнения шагов асинхронных вычислений и обработки ошибок .
Такой большой API может быть ошеломляющим, но в основном это относится к нескольким четким и четким вариантам использования.
3. Использование CompletableFuture
в качестве простого будущего
Во-первых, класс CompletableFuture
реализует интерфейс Future
, поэтому мы можем использовать его как реализацию Future
, но с дополнительной логикой завершения .
Например, мы можем создать экземпляр этого класса с конструктором без аргументов, чтобы представить какой-то будущий результат, передать его потребителям и завершить его в какой-то момент в будущем, используя метод complete
. Потребители могут использовать метод get
, чтобы заблокировать текущий поток, пока не будет предоставлен этот результат.
В приведенном ниже примере у нас есть метод, который создает экземпляр CompletableFuture
, затем запускает некоторые вычисления в другом потоке и немедленно возвращает Future .
Когда вычисление завершено, метод завершает Future
, предоставляя результат методу complete
:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
Чтобы раскрутить вычисления, мы используем Executor
API. Этот метод создания и завершения CompletableFuture
можно использовать вместе с любым механизмом параллелизма или API, включая необработанные потоки.
Обратите внимание, что метод calculateAsync
возвращает экземпляр Future
.
Мы просто вызываем метод, получаем экземпляр Future
и вызываем для него метод get
, когда готовы заблокировать результат.
Также обратите внимание, что метод get
генерирует некоторые проверенные исключения, а именно ExecutionException
(инкапсулирует исключение, возникшее во время вычисления) и InterruptedException
(исключение, означающее, что поток, выполняющий метод, был прерван):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
Если мы уже знаем результат вычисления , мы можем использовать статический метод completeFuture
с аргументом, представляющим результат этого вычисления. Следовательно, метод get
Future
никогда не будет блокироваться, вместо этого немедленно возвращая этот результат:
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
В качестве альтернативного сценария мы можем захотеть отменить выполнение Future
.
4. CompletableFuture
с инкапсулированной логикой вычислений
Приведенный выше код позволяет нам выбрать любой механизм одновременного выполнения, но что, если мы хотим пропустить этот шаблон и просто выполнить какой-то код асинхронно?
Статические методы runAsync
и SupplyAsync
позволяют создать экземпляр CompletableFuture
из функциональных типов Runnable
и Supplier
соответственно.
И Runnable
, и Supplier
— это функциональные интерфейсы, которые позволяют передавать свои экземпляры в виде лямбда-выражений благодаря новой функции Java 8.
Интерфейс Runnable
— это тот же самый старый интерфейс, который используется в потоках, и он не позволяет возвращать значение.
Интерфейс Supplier
— это универсальный функциональный интерфейс с одним методом, который не имеет аргументов и возвращает значение параметризованного типа.
Это позволяет нам предоставить экземпляр Supplier
в виде лямбда-выражения, которое выполняет вычисления и возвращает результат . Это так же просто, как:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
5. Обработка результатов асинхронных вычислений
Самый общий способ обработки результата вычисления — передать его функции. Метод thenApply
делает именно это; он принимает экземпляр Function
, использует его для обработки результата и возвращает Future
, который содержит значение, возвращаемое функцией:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
Если нам не нужно возвращать значение по цепочке Future
, мы можем использовать экземпляр функционального интерфейса Consumer .
Его единственный метод принимает параметр и возвращает void
.
В CompletableFuture есть метод для этого варианта использования.
Метод thenAccept
получает Consumer
и передает ему результат вычисления. Затем последний вызов future.get()
возвращает экземпляр типа Void
:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
Наконец, если нам не нужно значение вычисления и мы не хотим возвращать какое-то значение в конце цепочки, мы можем передать лямбду Runnable методу
thenRun
. В следующем примере мы просто выводим строку в консоль после вызова future.get():
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
6. Объединение фьючерсов
Лучшая часть API CompletableFuture — это
возможность комбинировать экземпляры CompletableFuture
в цепочке шагов вычислений .
Результатом этой цепочки является само CompletableFuture
, который допускает дальнейшую цепочку и комбинирование. Этот подход повсеместно используется в функциональных языках и часто упоминается как монадический шаблон проектирования.
В следующем примере мы используем метод thenCompose
для последовательного связывания двух фьючерсов
.
Обратите внимание, что этот метод принимает функцию, которая возвращает экземпляр CompletableFuture .
Аргумент этой функции является результатом предыдущего шага вычисления. Это позволяет нам использовать это значение внутри следующей лямбды CompletableFuture
:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
Метод thenCompose
вместе с thenApply
реализуют базовые строительные блоки монадического шаблона. Они тесно связаны с методами map
и flatMap классов
Stream
и Optional
, также доступных в Java 8.
Оба метода получают функцию и применяют ее к результату вычислений, но метод thenCompose
( flatMap
) получает функцию, которая возвращает другой объект того же типа . Эта функциональная структура позволяет составлять экземпляры этих классов как строительные блоки.
Если мы хотим выполнить два независимых Future
и что-то сделать с их результатами, мы можем использовать метод thenCombine
, который принимает Future
и Function
с двумя аргументами для обработки обоих результатов:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
Более простой случай, когда мы хотим что-то сделать с двумя результатами Futures
, но нам не нужно передавать какое-либо результирующее значение по цепочке Future .
Метод thenAcceptBoth
поможет:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7. Разница между thenApply()
и thenCompose()
В наших предыдущих разделах мы показали примеры, касающиеся thenApply()
и thenCompose()
. Оба API помогают связывать разные вызовы CompletableFuture
, но использование этих двух функций отличается.
7.1. затемПрименить()
Мы можем использовать этот метод для работы с результатом предыдущего вызова. Однако важно помнить, что возвращаемый тип будет объединен из всех вызовов.
Так что этот метод полезен, когда мы хотим преобразовать результат вызова CompletableFuture :
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
7.2. затемСоставить()
Метод thenCompose()
похож на thenApply()
в том смысле, что оба метода возвращают новую стадию завершения. Однако thenCompose()
использует предыдущий этап в качестве аргумента . Он будет сглаживать и возвращать Future
с результатом напрямую, а не вложенным future, как мы наблюдали в thenApply():
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
Поэтому, если идея состоит в том, чтобы объединить методы CompletableFuture
в цепочку , то лучше использовать thenCompose()
.
Также обратите внимание, что разница между этими двумя методами аналогична разнице между map()
и flatMap()
.
8. Запуск нескольких фьючерсов
параллельно
Когда нам нужно выполнить несколько фьючерсов
параллельно, мы обычно хотим дождаться, пока все они будут выполнены, а затем обработать их объединенные результаты.
Статический метод CompletableFuture.allOf
позволяет дождаться завершения всех фьючерсов
, предоставленных в виде var-arg:
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
Обратите внимание, что CompletableFuture.allOf()
возвращает тип CompletableFuture<Void>
. Ограничение этого метода в том, что он не возвращает комбинированные результаты всех фьючерсов
. Вместо этого нам приходится вручную получать результаты из Futures
. К счастью, метод CompletableFuture.join()
и Java 8 Streams API упрощают задачу:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
Метод CompletableFuture.join()
аналогичен методу get
, но выдает непроверенное исключение в случае, если Future
не завершается нормально. Это позволяет использовать его как ссылку на метод в методе Stream.map()
.
9. Обработка ошибок
Для обработки ошибок в цепочке шагов асинхронных вычислений мы должны аналогичным образом адаптировать идиому throw/catch .
Вместо перехвата исключения в синтаксическом блоке класс CompletableFuture
позволяет нам обрабатывать его в специальном методе дескриптора .
Этот метод получает два параметра: результат вычисления (если оно завершилось успешно) и выброшенное исключение (если какой-то шаг вычисления не завершился нормально).
В следующем примере мы используем метод handle
для предоставления значения по умолчанию, когда асинхронное вычисление приветствия было завершено с ошибкой, поскольку не было указано имя:
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
В качестве альтернативного сценария предположим, что мы хотим вручную завершить Future
со значением, как в первом примере, но также иметь возможность завершить его с исключением. Именно для этого и предназначен метод completeExceptionally .
Метод completableFuture.get()
в следующем примере создает ExecutionException
с RuntimeException
в качестве причины:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
В приведенном выше примере мы могли бы обработать исключение асинхронно с помощью метода handle
, но с помощью метода get
мы можем использовать более типичный подход синхронной обработки исключений.
10. Асинхронные методы
Большинство методов свободного API в классе CompletableFuture
имеют два дополнительных варианта с постфиксом Async .
Эти методы обычно предназначены для запуска соответствующего шага выполнения в другом потоке .
Методы без постфикса Async
запускают следующую стадию выполнения с использованием вызывающего потока. Напротив, метод Async
без аргумента Executor
выполняет шаг, используя общую реализацию пула fork/join
Executor
, доступ к которой осуществляется с помощью метода ForkJoinPool.commonPool()
. Наконец, метод Async
с аргументом Executor
выполняет шаг, используя переданный Executor
.
Вот модифицированный пример, который обрабатывает результат вычисления с экземпляром Function .
Единственным видимым отличием является метод thenApplyAsync
, но под капотом приложение функции завернуто в экземпляр ForkJoinTask
(подробнее о фреймворке fork/join
см. в статье «Руководство по фреймворку Fork/Join в Java» ) . . Это позволяет нам еще больше распараллелить наши вычисления и более эффективно использовать системные ресурсы:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture
API
Java 9 расширяет API CompletableFuture следующими изменениями:
- Добавлены новые фабричные методы
- Поддержка задержек и тайм-аутов
- Улучшенная поддержка подклассов
и новые API экземпляров:
Исполнитель по умолчаниюExecutor()
CompletableFuture<U> newIncompleteFuture()
CompletableFuture<T> копировать()
CompletionStage<T> минимальныйCompletionStage()
CompletableFuture<T> completeAsync (поставщик <? расширяет поставщика T>, исполнитель-исполнитель)
CompletableFuture<T> completeAsync (поставщик<? расширяет поставщика T>)
CompletableFuture<T> orTimeout (длительный тайм-аут, блок TimeUnit)
CompletableFuture<T> completeOnTimeout (значение T, длительное время ожидания, единица измерения TimeUnit)
Также теперь у нас есть несколько статических служебных методов:
Executor delayedExecutor (длительная задержка, модуль TimeUnit, исполнитель Executor)
Executor delayedExecutor (длительная задержка, блок TimeUnit)
<U> CompletionStage<U> CompletedStage (значение U)
<U> CompletionStage<U> failedStage (выбрасываемый ex)
<U> CompletableFuture<U> failedFuture (Throwable ex)
Наконец, для решения проблемы тайм-аута в Java 9 представлены еще две новые функции:
илиТаймаут()
завершить по таймауту ()
Вот подробная статья для дальнейшего чтения: Java 9 CompletableFuture API Improvements .
12. Заключение
В этой статье мы описали методы и типичные варианты использования класса CompletableFuture
.
Исходный код статьи доступен на GitHub .