1. Обзор
ExecutorService
— это API JDK, упрощающий выполнение задач в асинхронном режиме. Вообще говоря, ExecutorService
автоматически предоставляет пул потоков и API для назначения ему задач.
2. Создание экземпляра ExecutorService
2.1. Фабричные методы класса Executors
Самый простой способ создать ExecutorService
— использовать один из фабричных методов класса Executors
.
Например, следующая строка кода создаст пул потоков с 10 потоками:
ExecutorService executor = Executors.newFixedThreadPool(10);
Существует несколько других фабричных методов для создания предопределенной службы ExecutorService
, соответствующей конкретным случаям использования. Чтобы найти лучший метод для ваших нужд, обратитесь к официальной документации Oracle .
2.2. Непосредственное создание ExecutorService
Поскольку ExecutorService
— это интерфейс, можно использовать экземпляр любой его реализации. В пакете java.util.concurrent
есть несколько реализаций на выбор , или вы можете создать свою собственную.
Например, у класса ThreadPoolExecutor
есть несколько конструкторов, которые мы можем использовать для настройки службы-исполнителя и ее внутреннего пула:
ExecutorService executorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
Вы можете заметить, что приведенный выше код очень похож на исходный код фабричного метода newSingleThreadExecutor().
В большинстве случаев детальная ручная настройка не требуется.
3. Назначение задач ExecutorService
ExecutorService
может выполнять задачи Runnable
и Callable
. Для простоты в этой статье будут использоваться две примитивные задачи. Обратите внимание, что здесь мы используем лямбда-выражения вместо анонимных внутренних классов:
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
Мы можем назначать задачи ExecutorService
, используя несколько методов, включая execute()
, который унаследован от интерфейса Executor , а также
submit()
, invokeAny()
и invokeAll()
.
Метод execute()
недействителен и
не дает никакой возможности получить результат выполнения задачи или проверить статус задачи (выполняется ли она):
executorService.execute(runnableTask);
submit()
отправляет задачу Callable
или Runnable
в ExecutorService
и возвращает результат типа Future
:
Future<String> future =
executorService.submit(callableTask);
invokeAny()
присваивает набор задач ExecutorService
, вызывая выполнение каждой из них, и возвращает результат успешного выполнения одной задачи (если было успешное выполнение):
String result = executorService.invokeAny(callableTasks);
invokeAll()
присваивает ExecutorService
набор задач , вызывая выполнение каждой из них, и возвращает результат выполнения всех задач в виде списка объектов типа Future
:
List<Future<String>> futures = executorService.invokeAll(callableTasks);
Прежде чем идти дальше, нам нужно обсудить еще два вопроса: закрытие ExecutorService
и работа с типами возврата Future .
4. Завершение работы ExecutorService
Как правило, ExecutorService
не будет автоматически уничтожен, если нет задачи для обработки. Он останется в живых и будет ждать новой работы.
В некоторых случаях это очень полезно, например, когда приложению необходимо обрабатывать задачи, которые появляются нерегулярно, или количество задач неизвестно во время компиляции.
С другой стороны, приложение может достичь своего завершения, но не быть остановленным, потому что ожидающий ExecutorService
заставит JVM продолжать работу.
Чтобы правильно закрыть ExecutorService
, у нас есть API-интерфейсы shutdown()
и shutdownNow()
.
Метод shutdown()
не вызывает немедленного уничтожения ExecutorService
. Это заставит ExecutorService
прекратить принимать новые задачи и закрыться после того, как все запущенные потоки закончат свою текущую работу:
executorService.shutdown();
Метод shutdownNow()
пытается немедленно уничтожить ExecutorService
, но не гарантирует, что все запущенные потоки будут остановлены одновременно:
List<Runnable> notExecutedTasks = executorService.shutDownNow();
Этот метод возвращает список задач, ожидающих обработки. Разработчик сам решает, что делать с этими задачами.
Один хороший способ закрыть ExecutorService
(который также рекомендуется Oracle ) — использовать оба этих метода в сочетании с методом awaitTermination()
:
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
При таком подходе ExecutorService
сначала перестанет принимать новые задачи, а затем будет ждать до указанного периода времени, пока все задачи будут выполнены. Если это время истекает, выполнение немедленно останавливается.
5. Интерфейс будущего
Методы submit()
и invokeAll()
возвращают объект или набор объектов типа Future
, что позволяет нам получить результат выполнения задачи или проверить статус задачи (выполняется ли она).
Интерфейс Future
предоставляет специальный блокирующий метод get()
, который возвращает фактический результат выполнения задачи Callable
или null
в случае задачи Runnable
:
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Вызов метода get()
во время выполнения задачи приведет к блокировке выполнения до тех пор, пока задача не выполнится должным образом и результат не будет доступен.
При очень длительной блокировке, вызванной методом get()
, производительность приложения может ухудшиться. Если полученные данные не критичны, можно избежать такой проблемы, используя таймауты:
String result = future.get(200, TimeUnit.MILLISECONDS);
Если период выполнения больше указанного (в данном случае 200 миллисекунд), будет выдано исключение TimeoutException
.
Мы можем использовать метод isDone()
, чтобы проверить, обработана ли уже назначенная задача или нет.
Интерфейс Future
также предусматривает отмену выполнения задачи с помощью метода cancel()
и проверку отмены с помощью метода isCancelled()
:
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6. Интерфейс ScheduledExecutorService
ScheduledExecutorService запускает
задачи после некоторой предопределенной задержки и/или периодически.
Опять же, лучший способ создать экземпляр ScheduledExecutorService
— использовать фабричные методы класса Executors
.
Для этого раздела мы используем ScheduledExecutorService
с одним потоком:
ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
Чтобы запланировать выполнение отдельной задачи после фиксированной задержки, используйте метод Scheduled () службы
ScheduledExecutorService
.
Два метода schedule()
позволяют выполнять задачи Runnable
или Callable
:
Future<String> resultFuture =
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
Метод scheduleAtFixedRate()
позволяет нам периодически запускать задачу после фиксированной задержки. Приведенный выше код делает задержку на одну секунду перед выполнением callableTask
.
Следующий блок кода запустит задачу после первоначальной задержки в 100 миллисекунд. И после этого он будет запускать одну и ту же задачу каждые 450 миллисекунд:
Future<String> resultFuture = service
.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
Если процессору требуется больше времени для выполнения назначенной задачи, чем параметр period метода
scheduleAtFixedRate()
, ScheduledExecutorService
будет ждать завершения текущей задачи перед запуском следующей.
Если необходимо иметь фиксированную задержку между итерациями задачи, следует использовать scheduleWithFixedDelay() .
Например, следующий код гарантирует 150-миллисекундную паузу между окончанием текущего выполнения и началом другого:
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
Согласно контрактам методов scheduleAtFixedRate()
и scheduleWithFixedDelay()
, период выполнения задачи завершится при завершении ExecutorService
или при возникновении исключения во время выполнения задачи .
7. ExecutorService
против Fork/Join
После выпуска Java 7 многие разработчики решили заменить инфраструктуру ExecutorService
на инфраструктуру fork/join.
Однако это не всегда правильное решение. Несмотря на простоту и частый прирост производительности, связанный с fork/join, он снижает контроль разработчика над параллельным выполнением.
ExecutorService
дает разработчику возможность контролировать количество генерируемых потоков и степень детализации задач, которые должны выполняться отдельными потоками. Наилучший вариант использования ExecutorService
— обработка независимых задач, таких как транзакции или запросы по схеме «один поток на одну задачу».
Напротив, согласно документации Oracle , fork/join был разработан для ускорения работы, которую можно рекурсивно разбить на более мелкие части.
8. Заключение
Несмотря на относительную простоту ExecutorService
, есть несколько распространенных ошибок.
Подытожим их:
Сохранение неиспользуемого ExecutorService
в рабочем состоянии : См. подробное объяснение в Разделе 4 о том, как завершить работу ExecutorService
.
Неправильная емкость пула потоков при использовании пула потоков фиксированной длины . Очень важно определить, сколько потоков потребуется приложению для эффективного выполнения задач. Слишком большой пул потоков вызовет ненужные накладные расходы только на создание потоков, которые в основном будут находиться в режиме ожидания. Слишком малое количество может привести к тому, что приложение не будет отвечать из-за длительных периодов ожидания задач в очереди.
Вызов метода get() класса
Future
после `` отмены задачи : попытка получить результат уже отмененной задачи вызывает исключение CancellationException
.
Неожиданно долгая блокировка с помощью метода get () класса
Future
`` : мы должны использовать тайм-ауты, чтобы избежать непредвиденных ожиданий.
Как всегда, код этой статьи доступен в репозитории GitHub .