1. Обзор
Инфраструктура ExecutorService
упрощает обработку задач в нескольких потоках. Мы собираемся проиллюстрировать несколько сценариев, в которых мы ждем, пока потоки закончат свое выполнение.
Кроме того, мы покажем, как корректно закрыть ExecutorService
и дождаться завершения выполнения уже запущенных потоков.
2. После выключения Исполнителя
При использовании Executor
мы можем закрыть его, вызвав методы shutdown()
или shutdownNow() .
Хотя он не будет ждать, пока все потоки перестанут выполняться.
Ожидание завершения существующих потоков может быть достигнуто с помощью метода awaitTermination()
.
Это блокирует поток до тех пор, пока все задачи не завершат свое выполнение или не будет достигнуто указанное время ожидания:
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
3. Использование CountDownLatch
Далее давайте рассмотрим другой подход к решению этой проблемы — использование CountDownLatch
для сигнализации о завершении задачи.
Мы можем инициализировать его значением, представляющим количество раз, которое может быть уменьшено, прежде чем все потоки, вызвавшие метод await()
, будут уведомлены.
Например, если нам нужно, чтобы текущий поток ждал, пока другие N
потоков закончат свое выполнение, мы можем инициализировать защелку, используя N
:
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
4. Использование invokeAll()
Первый подход, который мы можем использовать для запуска потоков, — это метод invokeAll()
. Метод возвращает список объектов Future
после завершения всех задач или истечения времени ожидания .
Кроме того, мы должны отметить, что порядок возвращаемых объектов Future
такой же, как в списке предоставленных объектов Callable
:
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue(totalProcessingTime >= 3000);
String firstThreadResponse = futures.get(0).get();
assertTrue("fast thread".equals(firstThreadResponse));
String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));
5. Использование ExecutorCompletionService
Другой подход к запуску нескольких потоков — использование ExecutorCompletionService.
Он использует предоставленный ExecutorService
для выполнения задач.
Одно отличие от invokeAll()
заключается в порядке, в котором возвращаются фьючерсы,
представляющие выполненные задачи. ExecutorCompletionService
использует очередь для хранения результатов в том порядке, в котором они были завершены , тогда как invokeAll()
возвращает список, имеющий тот же последовательный порядок, что и итератор для данного списка задач:
CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
for (Callable<String> callable : callables) {
service.submit(callable);
}
Доступ к результатам можно получить с помощью метода take()
:
long startProcessingTime = System.currentTimeMillis();
Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;
assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
&& totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");
future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;
assertTrue(
"Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(
totalProcessingTime >= 3000
&& totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
6. Заключение
В зависимости от варианта использования у нас есть различные варианты ожидания завершения выполнения потоков.
CountDownLatch полезен, когда нам нужен механизм для уведомления одного или нескольких потоков о завершении набора операций, выполняемых другими потоками .
ExecutorCompletionService
полезен, когда нам нужно как можно скорее получить доступ к результату задачи, и другие подходы, когда мы хотим дождаться завершения всех запущенных задач.
Исходный код статьи доступен на GitHub .