Перейти к основному содержимому

Руководство по Java ExecutorService

· 7 мин. чтения

1. Обзор

ExecutorService — это API JDK, упрощающий выполнение задач в асинхронном режиме. Вообще говоря, ExecutorService автоматически предоставляет пул потоков и API для назначения ему задач.

2. Создание экземпляра ExecutorService

2.1. Фабричные методы класса Executors

Самый простой способ создать ExecutorService — использовать один из фабричных методов класса Executors .

Например, следующая строка кода создаст пул потоков с 10 потоками:

ExecutorService executor = Executors.newFixedThreadPool(10);

Существует несколько других фабричных методов для создания предопределенной службы ExecutorService , соответствующей конкретным случаям использования. Чтобы найти лучший метод для ваших нужд, обратитесь к официальной документации Oracle .

2.2. Непосредственное создание ExecutorService

Поскольку ExecutorService — это интерфейс, можно использовать экземпляр любой его реализации. В пакете java.util.concurrent есть несколько реализаций на выбор , или вы можете создать свою собственную.

Например, у класса ThreadPoolExecutor есть несколько конструкторов, которые мы можем использовать для настройки службы-исполнителя и ее внутреннего пула:

ExecutorService executorService = 
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

Вы можете заметить, что приведенный выше код очень похож на исходный код фабричного метода newSingleThreadExecutor(). В большинстве случаев детальная ручная настройка не требуется.

3. Назначение задач ExecutorService

ExecutorService может выполнять задачи Runnable и Callable . Для простоты в этой статье будут использоваться две примитивные задачи. Обратите внимание, что здесь мы используем лямбда-выражения вместо анонимных внутренних классов:

Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

Мы можем назначать задачи ExecutorService , используя несколько методов, включая execute() , который унаследован от интерфейса Executor , а также submit() , invokeAny() и invokeAll() .

Метод execute() недействителен и не дает никакой возможности получить результат выполнения задачи или проверить статус задачи (выполняется ли она):

executorService.execute(runnableTask);

submit() отправляет задачу Callable или Runnable в ExecutorService и возвращает результат типа Future :

Future<String> future = 
executorService.submit(callableTask);

invokeAny() присваивает набор задач ExecutorService , вызывая выполнение каждой из них, и возвращает результат успешного выполнения одной задачи (если было успешное выполнение):

String result = executorService.invokeAny(callableTasks);

invokeAll() присваивает ExecutorService набор задач , вызывая выполнение каждой из них, и возвращает результат выполнения всех задач в виде списка объектов типа Future :

List<Future<String>> futures = executorService.invokeAll(callableTasks);

Прежде чем идти дальше, нам нужно обсудить еще два вопроса: закрытие ExecutorService и работа с типами возврата Future .

4. Завершение работы ExecutorService

Как правило, ExecutorService не будет автоматически уничтожен, если нет задачи для обработки. Он останется в живых и будет ждать новой работы.

В некоторых случаях это очень полезно, например, когда приложению необходимо обрабатывать задачи, которые появляются нерегулярно, или количество задач неизвестно во время компиляции.

С другой стороны, приложение может достичь своего завершения, но не быть остановленным, потому что ожидающий ExecutorService заставит JVM продолжать работу.

Чтобы правильно закрыть ExecutorService , у нас есть API-интерфейсы shutdown() и shutdownNow() .

Метод shutdown() не вызывает немедленного уничтожения ExecutorService . Это заставит ExecutorService прекратить принимать новые задачи и закрыться после того, как все запущенные потоки закончат свою текущую работу:

executorService.shutdown();

Метод shutdownNow() пытается немедленно уничтожить ExecutorService , но не гарантирует, что все запущенные потоки будут остановлены одновременно:

List<Runnable> notExecutedTasks = executorService.shutDownNow();

Этот метод возвращает список задач, ожидающих обработки. Разработчик сам решает, что делать с этими задачами.

Один хороший способ закрыть ExecutorService (который также рекомендуется Oracle ) — использовать оба этих метода в сочетании с методом awaitTermination() :

executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}

При таком подходе ExecutorService сначала перестанет принимать новые задачи, а затем будет ждать до указанного периода времени, пока все задачи будут выполнены. Если это время истекает, выполнение немедленно останавливается.

5. Интерфейс будущего

Методы submit() и invokeAll() возвращают объект или набор объектов типа Future , что позволяет нам получить результат выполнения задачи или проверить статус задачи (выполняется ли она).

Интерфейс Future предоставляет специальный блокирующий метод get() , который возвращает фактический результат выполнения задачи Callable или null в случае задачи Runnable :

Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

Вызов метода get() во время выполнения задачи приведет к блокировке выполнения до тех пор, пока задача не выполнится должным образом и результат не будет доступен.

При очень длительной блокировке, вызванной методом get() , производительность приложения может ухудшиться. Если полученные данные не критичны, можно избежать такой проблемы, используя таймауты:

String result = future.get(200, TimeUnit.MILLISECONDS);

Если период выполнения больше указанного (в данном случае 200 миллисекунд), будет выдано исключение TimeoutException .

Мы можем использовать метод isDone() , чтобы проверить, обработана ли уже назначенная задача или нет.

Интерфейс Future также предусматривает отмену выполнения задачи с помощью метода cancel() и проверку отмены с помощью метода isCancelled() :

boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

6. Интерфейс ScheduledExecutorService

ScheduledExecutorService запускает задачи после некоторой предопределенной задержки и/или периодически.

Опять же, лучший способ создать экземпляр ScheduledExecutorService — использовать фабричные методы класса Executors .

Для этого раздела мы используем ScheduledExecutorService с одним потоком:

ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();

Чтобы запланировать выполнение отдельной задачи после фиксированной задержки, используйте метод Scheduled () службы ScheduledExecutorService .

Два метода schedule() позволяют выполнять задачи Runnable или Callable :

Future<String> resultFuture = 
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

Метод scheduleAtFixedRate() позволяет нам периодически запускать задачу после фиксированной задержки. Приведенный выше код делает задержку на одну секунду перед выполнением callableTask .

Следующий блок кода запустит задачу после первоначальной задержки в 100 миллисекунд. И после этого он будет запускать одну и ту же задачу каждые 450 миллисекунд:

Future<String> resultFuture = service
.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

Если процессору требуется больше времени для выполнения назначенной задачи, чем параметр period метода scheduleAtFixedRate() , ScheduledExecutorService будет ждать завершения текущей задачи перед запуском следующей.

Если необходимо иметь фиксированную задержку между итерациями задачи, следует использовать scheduleWithFixedDelay() .

Например, следующий код гарантирует 150-миллисекундную паузу между окончанием текущего выполнения и началом другого:

service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

Согласно контрактам методов scheduleAtFixedRate() и scheduleWithFixedDelay() , период выполнения задачи завершится при завершении ExecutorService или при возникновении исключения во время выполнения задачи .

7. ExecutorService против Fork/Join

После выпуска Java 7 многие разработчики решили заменить инфраструктуру ExecutorService на инфраструктуру fork/join.

Однако это не всегда правильное решение. Несмотря на простоту и частый прирост производительности, связанный с fork/join, он снижает контроль разработчика над параллельным выполнением.

ExecutorService дает разработчику возможность контролировать количество генерируемых потоков и степень детализации задач, которые должны выполняться отдельными потоками. Наилучший вариант использования ExecutorService — обработка независимых задач, таких как транзакции или запросы по схеме «один поток на одну задачу».

Напротив, согласно документации Oracle , fork/join был разработан для ускорения работы, которую можно рекурсивно разбить на более мелкие части.

8. Заключение

Несмотря на относительную простоту ExecutorService , есть несколько распространенных ошибок.

Подытожим их:

Сохранение неиспользуемого ExecutorService в рабочем состоянии : См. подробное объяснение в Разделе 4 о том, как завершить работу ExecutorService .

Неправильная емкость пула потоков при использовании пула потоков фиксированной длины . Очень важно определить, сколько потоков потребуется приложению для эффективного выполнения задач. Слишком большой пул потоков вызовет ненужные накладные расходы только на создание потоков, которые в основном будут находиться в режиме ожидания. Слишком малое количество может привести к тому, что приложение не будет отвечать из-за длительных периодов ожидания задач в очереди.

Вызов метода get() класса Future после `` отмены задачи : попытка получить результат уже отмененной задачи вызывает исключение CancellationException .

Неожиданно долгая блокировка с помощью метода get () класса Future `` : мы должны использовать тайм-ауты, чтобы избежать непредвиденных ожиданий.

Как всегда, код этой статьи доступен в репозитории GitHub .