Перейти к основному содержимому

Руководство по java.util.concurrent.Future

· 9 мин. чтения

Задача: Наибольшая подстрока без повторений

Для заданной строки s, найдите длину наибольшей подстроки без повторяющихся символов. Подстрока — это непрерывная непустая последовательность символов внутри строки...

ANDROMEDA 42

1. Обзор

В этом уроке мы узнаем о Future . Интерфейс, существующий со времен Java 1.5, может быть весьма полезен при работе с асинхронными вызовами и параллельной обработкой.

2. Создание фьючерсов

Проще говоря, класс Future представляет будущий результат асинхронного вычисления. Этот результат в конечном итоге появится в Будущем после завершения обработки.

Давайте посмотрим, как писать методы, которые создают и возвращают экземпляр Future .

Длительные методы являются хорошими кандидатами для асинхронной обработки и интерфейса Future , поскольку мы можем выполнять другие процессы, пока ждем завершения задачи, инкапсулированной в Future .

Ниже приведены некоторые примеры операций, использующих асинхронный характер Future :

  • вычислительно-интенсивные процессы (математические и научные расчеты)
  • манипулирование большими структурами данных (big data)
  • удаленные вызовы методов (загрузка файлов, удаление HTML, веб-сервисы)

2.1. Реализация фьючерсов с помощью FutureTask

Для нашего примера мы собираемся создать очень простой класс, который вычисляет квадрат Integer . Это определенно не относится к категории долговременных методов, но мы собираемся поместить в него вызов Thread.sleep() , чтобы он длился 1 секунду до завершения:

public class SquareCalculator {    

private ExecutorService executor
= Executors.newSingleThreadExecutor();

public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}

Часть кода, которая фактически выполняет вычисление, содержится в методе call() и предоставляется в виде лямбда-выражения. Как мы видим, в этом нет ничего особенного, кроме упомянутого ранее вызова sleep() .

Становится интереснее, когда мы обращаем внимание на использование Callable и ExecutorService .

Callable — это интерфейс, представляющий задачу, которая возвращает результат и имеет единственный метод call() . Здесь мы создали его экземпляр, используя лямбда-выражение.

Создание экземпляра Callable никуда не приведет; нам все еще нужно передать этот экземпляр исполнителю, который позаботится о запуске задачи в новом потоке и вернет нам ценный объект Future . Вот тут-то и появляется ExecutorService .

Есть несколько способов получить доступ к экземпляру ExecutorService , и большинство из них предоставляется статическими фабричными методами служебного класса Executors . В этом примере мы использовали базовый метод newSingleThreadExecutor() , который дает нам ExecutorService , способный обрабатывать один поток за раз.

Когда у нас есть объект ExecutorService , нам просто нужно вызвать submit(), передав наш Callable в качестве аргумента. Затем submit() запустит задачу и вернет объект FutureTask , который является реализацией интерфейса Future .

3. Потребление фьючерсов

До этого момента мы научились создавать экземпляр Future .

В этом разделе мы узнаем, как работать с этим экземпляром, изучив все методы, которые являются частью Future API.

3.1. Использование isDone() и get() для получения результатов

Теперь нам нужно вызвать calculate() и использовать возвращенное Future для получения результирующего Integer . С этой задачей нам помогут два метода из Future API.

Future.isDone() сообщает нам, завершил ли исполнитель задачу. Если задача завершена, она вернет true; в противном случае возвращается false .

Метод, возвращающий фактический результат вычисления, — Future.get() . Мы видим, что этот метод блокирует выполнение до тех пор, пока задача не будет завершена. Однако в нашем примере это не будет проблемой, потому что мы проверим, завершена ли задача, вызвав isDone() .

Используя эти два метода, мы можем запускать другой код, пока ждем завершения основной задачи:

Future<Integer> future = new SquareCalculator().calculate(10);

while(!future.isDone()) {
System.out.println("Calculating...");
Thread.sleep(300);
}

Integer result = future.get();

В этом примере мы напишем простое сообщение на выходе, чтобы сообщить пользователю, что программа выполняет расчет.

Метод get() блокирует выполнение до тех пор, пока задача не будет завершена. Опять же, это не будет проблемой, потому что в нашем примере get() будет вызываться только после того, как мы убедимся, что задача завершена. Таким образом, в этом сценарии future.get() всегда будет возвращаться немедленно.

Стоит отметить, что у get() есть перегруженная версия, которая принимает таймаут и TimeUnit в качестве аргументов:

Integer result = future.get(500, TimeUnit.MILLISECONDS);

Разница между get(long, TimeUnit) и get() заключается в том, что первый вызовет TimeoutException , если задача не вернется до указанного периода ожидания.

3.2. Отмена Future с помощью cancel()

Допустим, мы запустили задачу, но результат по какой-то причине нас больше не волнует. Мы можем использовать Future.cancel(boolean) , чтобы указать исполнителю остановить операцию и прервать ее основной поток:

Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);

Наш экземпляр Future из приведенного выше кода никогда не завершит свою работу. На самом деле, если мы попытаемся вызвать get() из этого экземпляра, после вызова cancel() результатом будет CancellationException . Future.isCancelled() сообщит нам, было ли Future уже отменено. Это может быть очень полезно, чтобы избежать исключения CancellationException .

Также возможно, что вызов cancel() завершится ошибкой. В этом случае возвращаемое значение будет false . Важно отметить, что cancel() принимает логическое значение в качестве аргумента. Это определяет, должен ли поток, выполняющий задачу, быть прерван или нет.

4. Больше многопоточности с пулами потоков

Наш текущий ExecutorService является однопоточным, поскольку он был получен с помощью Executors.newSingleThreadExecutor . Чтобы выделить этот единственный поток, давайте запустим два вычисления одновременно:

SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
System.out.println(
String.format(
"future1 is %s and future2 is %s",
future1.isDone() ? "done" : "not done",
future2.isDone() ? "done" : "not done"
)
);
Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();

Теперь давайте проанализируем вывод для этого кода:

calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000

Понятно, что процесс не параллельный. Мы видим, что вторая задача запускается только после завершения первой задачи, поэтому весь процесс занимает около 2 секунд.

Чтобы сделать нашу программу действительно многопоточной, мы должны использовать другую разновидность ExecutorService . Давайте посмотрим, как изменится поведение нашего примера, если мы используем пул потоков, предоставляемый фабричным методом Executors.newFixedThreadPool() :

public class SquareCalculator {

private ExecutorService executor = Executors.newFixedThreadPool(2);

//...
}

С помощью простого изменения в нашем классе SquareCalculator теперь у нас есть исполнитель, который может использовать 2 одновременных потока.

Если мы снова запустим тот же клиентский код, мы получим следующий вывод:

calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000

Теперь это выглядит намного лучше. Мы видим, что две задачи начинаются и заканчиваются одновременно, и весь процесс занимает около 1 секунды.

Существуют и другие фабричные методы, которые можно использовать для создания пулов потоков, например Executors.newCachedThreadPool() , который повторно использует ранее использованные потоки , когда они доступны, и Executors.newScheduledThreadPool() , который планирует запуск команд после заданной задержки .

Для получения дополнительной информации о ExecutorService прочитайте нашу статью , посвященную этой теме.

5. Обзор ForkJoinTask

ForkJoinTask — это абстрактный класс, который реализует Future и способен выполнять большое количество задач, размещенных в небольшом количестве реальных потоков в ForkJoinPool .

В этом разделе мы быстро рассмотрим основные характеристики ForkJoinPool . Подробное руководство по этой теме можно найти в нашем Руководстве по платформе Fork/Join в Java .

Основная характеристика ForkJoinTask заключается в том, что он обычно порождает новые подзадачи как часть работы, необходимой для выполнения его основной задачи. Он генерирует новые задачи, вызывая fork() , и собирает все результаты с помощью join() , отсюда и название класса.

Есть два абстрактных класса, которые реализуют ForkJoinTask : RecursiveTask , который возвращает значение по завершении, и RecursiveAction , который ничего не возвращает. Как следует из их названий, эти классы должны использоваться для рекурсивных задач, таких как навигация по файловой системе или сложные математические вычисления.

Давайте расширим наш предыдущий пример, чтобы создать класс, который, учитывая Integer , будет вычислять квадраты суммы для всех своих факториальных элементов. Так, например, если мы передаем число 4 в наш калькулятор, мы должны получить результат из суммы 4² + 3² + 2² + 1², что равно 30.

Во-первых, нам нужно создать конкретную реализацию RecursiveTask и реализовать ее метод calculate() . Здесь мы напишем нашу бизнес-логику:

public class FactorialSquareCalculator extends RecursiveTask<Integer> {

private Integer n;

public FactorialSquareCalculator(Integer n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n <= 1) {
return n;
}

FactorialSquareCalculator calculator
= new FactorialSquareCalculator(n - 1);

calculator.fork();

return n * n + calculator.join();
}
}

Обратите внимание, как мы достигаем рекурсивности, создавая новый экземпляр FactorialSquareCalculator внутри calculate() . Вызывая неблокирующий метод fork() , мы просим ForkJoinPool инициировать выполнение этой подзадачи.

Метод join() вернет результат этого вычисления, к которому мы добавим квадрат числа, которое мы сейчас посещаем.

Теперь нам просто нужно создать ForkJoinPool для обработки выполнения и управления потоками:

ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);

6. Заключение

В этой статье мы всесторонне изучили интерфейс Future , затронув все его методы. Мы также узнали, как использовать возможности пулов потоков для запуска нескольких параллельных операций. Также были кратко рассмотрены основные методы класса ForkJoinTask , fork() и join() .

У нас есть много других замечательных статей о параллельных и асинхронных операциях в Java. Вот три из них, тесно связанных с интерфейсом Future , некоторые из которых уже упоминались в статье:

Как всегда, исходный код, использованный в этой статье, можно найти в нашем репозитории GitHub .