Перейти к основному содержимому

Руководство по CompletableFuture

· 10 мин. чтения

1. Введение

Это руководство представляет собой руководство по функциональным возможностям и вариантам использования класса CompletableFuture , который был представлен как улучшение API параллелизма Java 8.

2. Асинхронные вычисления в Java

Трудно рассуждать об асинхронных вычислениях. Обычно мы хотим думать о любом вычислении как о серии шагов, но в случае асинхронного вычисления действия, представленные в виде обратных вызовов, как правило, либо разбросаны по коду, либо глубоко вложены друг в друга . Все становится еще хуже, когда нам нужно обработать ошибки, которые могут возникнуть на одном из шагов.

Интерфейс Future был добавлен в Java 5 как результат асинхронных вычислений, но в нем не было методов для объединения этих вычислений или обработки возможных ошибок.

В Java 8 появился класс CompletableFuture . Наряду с интерфейсом Future он также реализовал интерфейс CompletionStage . Этот интерфейс определяет контракт для шага асинхронных вычислений, который мы можем комбинировать с другими шагами.

CompletableFuture — это одновременно строительный блок и фреймворк с примерно 50 различными методами для составления, объединения и выполнения шагов асинхронных вычислений и обработки ошибок .

Такой большой API может быть ошеломляющим, но в основном это относится к нескольким четким и четким вариантам использования.

3. Использование CompletableFuture в качестве простого будущего

Во-первых, класс CompletableFuture реализует интерфейс Future , поэтому мы можем использовать его как реализацию Future , но с дополнительной логикой завершения .

Например, мы можем создать экземпляр этого класса с конструктором без аргументов, чтобы представить какой-то будущий результат, передать его потребителям и завершить его в какой-то момент в будущем, используя метод complete . Потребители могут использовать метод get , чтобы заблокировать текущий поток, пока не будет предоставлен этот результат.

В приведенном ниже примере у нас есть метод, который создает экземпляр CompletableFuture , затем запускает некоторые вычисления в другом потоке и немедленно возвращает Future .

Когда вычисление завершено, метод завершает Future , предоставляя результат методу complete :

public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});

return completableFuture;
}

Чтобы раскрутить вычисления, мы используем Executor API. Этот метод создания и завершения CompletableFuture можно использовать вместе с любым механизмом параллелизма или API, включая необработанные потоки.

Обратите внимание, что метод calculateAsync возвращает экземпляр Future .

Мы просто вызываем метод, получаем экземпляр Future и вызываем для него метод get , когда готовы заблокировать результат.

Также обратите внимание, что метод get генерирует некоторые проверенные исключения, а именно ExecutionException (инкапсулирует исключение, возникшее во время вычисления) и InterruptedException (исключение, означающее, что поток, выполняющий метод, был прерван):

Future<String> completableFuture = calculateAsync();

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

Если мы уже знаем результат вычисления , мы можем использовать статический метод completeFuture с аргументом, представляющим результат этого вычисления. Следовательно, метод get Future никогда не будет блокироваться, вместо этого немедленно возвращая этот результат:

Future<String> completableFuture = 
CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

В качестве альтернативного сценария мы можем захотеть отменить выполнение Future .

4. CompletableFuture с инкапсулированной логикой вычислений

Приведенный выше код позволяет нам выбрать любой механизм одновременного выполнения, но что, если мы хотим пропустить этот шаблон и просто выполнить какой-то код асинхронно?

Статические методы runAsync и SupplyAsync позволяют создать экземпляр CompletableFuture из функциональных типов Runnable и Supplier соответственно.

И Runnable , и Supplier — это функциональные интерфейсы, которые позволяют передавать свои экземпляры в виде лямбда-выражений благодаря новой функции Java 8.

Интерфейс Runnable — это тот же самый старый интерфейс, который используется в потоках, и он не позволяет возвращать значение.

Интерфейс Supplier — это универсальный функциональный интерфейс с одним методом, который не имеет аргументов и возвращает значение параметризованного типа.

Это позволяет нам предоставить экземпляр Supplier в виде лямбда-выражения, которое выполняет вычисления и возвращает результат . Это так же просто, как:

CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5. Обработка результатов асинхронных вычислений

Самый общий способ обработки результата вычисления — передать его функции. Метод thenApply делает именно это; он принимает экземпляр Function , использует его для обработки результата и возвращает Future , который содержит значение, возвращаемое функцией:

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

Если нам не нужно возвращать значение по цепочке Future , мы можем использовать экземпляр функционального интерфейса Consumer . Его единственный метод принимает параметр и возвращает void .

В CompletableFuture есть метод для этого варианта использования. Метод thenAccept получает Consumer и передает ему результат вычисления. Затем последний вызов future.get() возвращает экземпляр типа Void :

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

Наконец, если нам не нужно значение вычисления и мы не хотим возвращать какое-то значение в конце цепочки, мы можем передать лямбду Runnable методу thenRun . В следующем примере мы просто выводим строку в консоль после вызова future.get():

CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));

future.get();

6. Объединение фьючерсов

Лучшая часть API CompletableFuture — это возможность комбинировать экземпляры CompletableFuture в цепочке шагов вычислений .

Результатом этой цепочки является само CompletableFuture , который допускает дальнейшую цепочку и комбинирование. Этот подход повсеместно используется в функциональных языках и часто упоминается как монадический шаблон проектирования.

В следующем примере мы используем метод thenCompose для последовательного связывания двух фьючерсов .

Обратите внимание, что этот метод принимает функцию, которая возвращает экземпляр CompletableFuture . Аргумент этой функции является результатом предыдущего шага вычисления. Это позволяет нам использовать это значение внутри следующей лямбды CompletableFuture :

CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

Метод thenCompose вместе с thenApply реализуют базовые строительные блоки монадического шаблона. Они тесно связаны с методами map и flatMap классов Stream и Optional , также доступных в Java 8.

Оба метода получают функцию и применяют ее к результату вычислений, но метод thenCompose ( flatMap ) получает функцию, которая возвращает другой объект того же типа . Эта функциональная структура позволяет составлять экземпляры этих классов как строительные блоки.

Если мы хотим выполнить два независимых Future и что-то сделать с их результатами, мы можем использовать метод thenCombine , который принимает Future и Function с двумя аргументами для обработки обоих результатов:

CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

Более простой случай, когда мы хотим что-то сделать с двумя результатами Futures , но нам не нужно передавать какое-либо результирующее значение по цепочке Future . Метод thenAcceptBoth поможет:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));

7. Разница между thenApply() и thenCompose()

В наших предыдущих разделах мы показали примеры, касающиеся thenApply() и thenCompose() . Оба API помогают связывать разные вызовы CompletableFuture , но использование этих двух функций отличается.

7.1. затемПрименить()

Мы можем использовать этот метод для работы с результатом предыдущего вызова. Однако важно помнить, что возвращаемый тип будет объединен из всех вызовов.

Так что этот метод полезен, когда мы хотим преобразовать результат вызова CompletableFuture :

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2. затемСоставить()

Метод thenCompose() похож на thenApply() в том смысле, что оба метода возвращают новую стадию завершения. Однако thenCompose() использует предыдущий этап в качестве аргумента . Он будет сглаживать и возвращать Future с результатом напрямую, а не вложенным future, как мы наблюдали в thenApply():

CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

Поэтому, если идея состоит в том, чтобы объединить методы CompletableFuture в цепочку , то лучше использовать thenCompose() .

Также обратите внимание, что разница между этими двумя методами аналогична разнице между map() и flatMap() .

8. Запуск нескольких фьючерсов параллельно

Когда нам нужно выполнить несколько фьючерсов параллельно, мы обычно хотим дождаться, пока все они будут выполнены, а затем обработать их объединенные результаты.

Статический метод CompletableFuture.allOf позволяет дождаться завершения всех фьючерсов , предоставленных в виде var-arg:

CompletableFuture<String> future1  
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

Обратите внимание, что CompletableFuture.allOf() возвращает тип CompletableFuture<Void> . Ограничение этого метода в том, что он не возвращает комбинированные результаты всех фьючерсов . Вместо этого нам приходится вручную получать результаты из Futures . К счастью, метод CompletableFuture.join() и Java 8 Streams API упрощают задачу:

String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

Метод CompletableFuture.join() аналогичен методу get , но выдает непроверенное исключение в случае, если Future не завершается нормально. Это позволяет использовать его как ссылку на метод в методе Stream.map() .

9. Обработка ошибок

Для обработки ошибок в цепочке шагов асинхронных вычислений мы должны аналогичным образом адаптировать идиому throw/catch .

Вместо перехвата исключения в синтаксическом блоке класс CompletableFuture позволяет нам обрабатывать его в специальном методе дескриптора . Этот метод получает два параметра: результат вычисления (если оно завершилось успешно) и выброшенное исключение (если какой-то шаг вычисления не завершился нормально).

В следующем примере мы используем метод handle для предоставления значения по умолчанию, когда асинхронное вычисление приветствия было завершено с ошибкой, поскольку не было указано имя:

String name = null;

// ...

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

В качестве альтернативного сценария предположим, что мы хотим вручную завершить Future со значением, как в первом примере, но также иметь возможность завершить его с исключением. Именно для этого и предназначен метод completeExceptionally . Метод completableFuture.get() в следующем примере создает ExecutionException с RuntimeException в качестве причины:

CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

В приведенном выше примере мы могли бы обработать исключение асинхронно с помощью метода handle , но с помощью метода get мы можем использовать более типичный подход синхронной обработки исключений.

10. Асинхронные методы

Большинство методов свободного API в классе CompletableFuture имеют два дополнительных варианта с постфиксом Async . Эти методы обычно предназначены для запуска соответствующего шага выполнения в другом потоке .

Методы без постфикса Async запускают следующую стадию выполнения с использованием вызывающего потока. Напротив, метод Async без аргумента Executor выполняет шаг, используя общую реализацию пула fork/join Executor , доступ к которой осуществляется с помощью метода ForkJoinPool.commonPool() . Наконец, метод Async с аргументом Executor выполняет шаг, используя переданный Executor .

Вот модифицированный пример, который обрабатывает результат вычисления с экземпляром Function . Единственным видимым отличием является метод thenApplyAsync , но под капотом приложение функции завернуто в экземпляр ForkJoinTask (подробнее о фреймворке fork/join см. в статье «Руководство по фреймворку Fork/Join в Java» ) . . Это позволяет нам еще больше распараллелить наши вычисления и более эффективно использовать системные ресурсы:

CompletableFuture<String> completableFuture  
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 расширяет API CompletableFuture следующими изменениями:

  • Добавлены новые фабричные методы
  • Поддержка задержек и тайм-аутов
  • Улучшенная поддержка подклассов

и новые API экземпляров:

  • Исполнитель по умолчаниюExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> копировать()
  • CompletionStage<T> минимальныйCompletionStage()
  • CompletableFuture<T> completeAsync (поставщик <? расширяет поставщика T>, исполнитель-исполнитель)
  • CompletableFuture<T> completeAsync (поставщик<? расширяет поставщика T>)
  • CompletableFuture<T> orTimeout (длительный тайм-аут, блок TimeUnit)
  • CompletableFuture<T> completeOnTimeout (значение T, длительное время ожидания, единица измерения TimeUnit)

Также теперь у нас есть несколько статических служебных методов:

  • Executor delayedExecutor (длительная задержка, модуль TimeUnit, исполнитель Executor)
  • Executor delayedExecutor (длительная задержка, блок TimeUnit)
  • <U> CompletionStage<U> CompletedStage (значение U)
  • <U> CompletionStage<U> failedStage (выбрасываемый ex)
  • <U> CompletableFuture<U> failedFuture (Throwable ex)

Наконец, для решения проблемы тайм-аута в Java 9 представлены еще две новые функции:

  • илиТаймаут()
  • завершить по таймауту ()

Вот подробная статья для дальнейшего чтения: Java 9 CompletableFuture API Improvements .

12. Заключение

В этой статье мы описали методы и типичные варианты использования класса CompletableFuture .

Исходный код статьи доступен на GitHub .