Перейти к основному содержимому

ExecutorService — ожидание завершения потоков

· 4 мин. чтения

1. Обзор

Инфраструктура ExecutorService упрощает обработку задач в нескольких потоках. Мы собираемся проиллюстрировать несколько сценариев, в которых мы ждем, пока потоки закончат свое выполнение.

Кроме того, мы покажем, как корректно закрыть ExecutorService и дождаться завершения выполнения уже запущенных потоков.

2. После выключения Исполнителя

При использовании Executor мы можем закрыть его, вызвав методы shutdown() или shutdownNow() . Хотя он не будет ждать, пока все потоки перестанут выполняться.

Ожидание завершения существующих потоков может быть достигнуто с помощью метода awaitTermination() .

Это блокирует поток до тех пор, пока все задачи не завершат свое выполнение или не будет достигнуто указанное время ожидания:

public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}

3. Использование CountDownLatch

Далее давайте рассмотрим другой подход к решению этой проблемы — использование CountDownLatch для сигнализации о завершении задачи.

Мы можем инициализировать его значением, представляющим количество раз, которое может быть уменьшено, прежде чем все потоки, вызвавшие метод await() , будут уведомлены.

Например, если нам нужно, чтобы текущий поток ждал, пока другие N потоков закончат свое выполнение, мы можем инициализировать защелку, используя N :

ExecutorService WORKER_THREAD_POOL 
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

4. Использование invokeAll()

Первый подход, который мы можем использовать для запуска потоков, — это метод invokeAll() . Метод возвращает список объектов Future после завершения всех задач или истечения времени ожидания .

Кроме того, мы должны отметить, что порядок возвращаемых объектов Future такой же, как в списке предоставленных объектов Callable :

ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;

assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();

assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5. Использование ExecutorCompletionService

Другой подход к запуску нескольких потоков — использование ExecutorCompletionService. Он использует предоставленный ExecutorService для выполнения задач.

Одно отличие от invokeAll() заключается в порядке, в котором возвращаются фьючерсы, представляющие выполненные задачи. ExecutorCompletionService использует очередь для хранения результатов в том порядке, в котором они были завершены , тогда как invokeAll() возвращает список, имеющий тот же последовательный порядок, что и итератор для данного списка задач:

CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
service.submit(callable);
}

Доступ к результатам можно получить с помощью метода take() :

long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
&& totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue(
"Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(
totalProcessingTime >= 3000
&& totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6. Заключение

В зависимости от варианта использования у нас есть различные варианты ожидания завершения выполнения потоков.

CountDownLatch полезен, когда нам нужен механизм для уведомления одного или нескольких потоков о завершении набора операций, выполняемых другими потоками .

ExecutorCompletionService полезен, когда нам нужно как можно скорее получить доступ к результату задачи, и другие подходы, когда мы хотим дождаться завершения всех запущенных задач.

Исходный код статьи доступен на GitHub .