Перейти к основному содержимому

Руководство по CountDownLatch в Java

· 5 мин. чтения

1. Введение

В этой статье мы дадим руководство по классу CountDownLatch и продемонстрируем, как его можно использовать, на нескольких практических примерах.

По сути, используя CountDownLatch , мы можем заставить поток блокироваться до тех пор, пока другие потоки не завершат данную задачу.

2. Использование в параллельном программировании

Проще говоря, CountDownLatch имеет поле счетчика , которое вы можете уменьшать по мере необходимости. Затем мы можем использовать его, чтобы заблокировать вызывающий поток, пока он не обнулится.

Если бы мы выполняли некоторую параллельную обработку, мы могли бы создать экземпляр CountDownLatch с тем же значением счетчика, что и количество потоков, с которыми мы хотим работать. Затем мы могли бы просто вызывать countdown() после завершения каждого потока, гарантируя, что зависимый поток, вызывающий await() , будет заблокирован до тех пор, пока не закончатся рабочие потоки.

3. Ожидание завершения пула потоков

Давайте попробуем этот шаблон, создав Worker и используя поле CountDownLatch , чтобы сигнализировать о его завершении:

public class Worker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;

public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
doSomeWork();
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}

Затем давайте создадим тест, чтобы доказать, что мы можем заставить CountDownLatch ожидать завершения экземпляров Worker :

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");

assertThat(outputScraper)
.containsExactly(
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Latch released"
);
}

Естественно, «Защелка освобождена» всегда будет последним выводом, так как это зависит от освобождения CountDownLatch .

Обратите внимание, что если бы мы не вызвали await() , мы не смогли бы гарантировать порядок выполнения потоков, поэтому тест случайным образом провалился бы.

4. Пул потоков, ожидающих начала

Если мы возьмем предыдущий пример, но на этот раз запустим тысячи потоков вместо пяти, вполне вероятно, что многие из более ранних потоков закончат обработку еще до того, как мы вызовем start() для более поздних. Это может затруднить попытку воспроизвести проблему параллелизма, поскольку мы не сможем заставить все наши потоки работать параллельно.

Чтобы обойти это, давайте заставим CountdownLatch работать иначе, чем в предыдущем примере. Вместо того, чтобы блокировать родительский поток до завершения некоторых дочерних потоков, мы можем заблокировать каждый дочерний поток до тех пор, пока не запустятся все остальные.

Давайте изменим наш метод run() , чтобы он блокировался перед обработкой:

public class WaitingWorker implements Runnable {

private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;

public WaitingWorker(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {

this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}

@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
doSomeWork();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}

Теперь давайте изменим наш тест, чтобы он блокировался до тех пор , пока не запустятся все рабочие процессы, разблокировал рабочие процессы, а затем блокировался, пока рабочие не завершили работу:

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new WaitingWorker(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");

assertThat(outputScraper)
.containsExactly(
"Workers ready",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Workers complete"
);
}

Этот паттерн действительно полезен для попытки воспроизвести ошибки параллелизма, так как его можно использовать, чтобы заставить тысячи потоков пытаться выполнять некоторую логику параллельно.

5. Раннее завершение CountdownLatch

Иногда мы можем столкнуться с ситуацией, когда рабочие процессы завершаются с ошибкой до обратного отсчета CountDownLatch. Это может привести к тому, что он никогда не достигнет нуля, а await() никогда не завершится:

@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear, I'm a BrokenWorker");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}

Давайте изменим наш предыдущий тест, чтобы использовать BrokenWorker, чтобы показать, как await() будет блокироваться навсегда:

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
}

Ясно, что это не то поведение, которое нам нужно — для приложения было бы гораздо лучше продолжать работу, чем бесконечно блокироваться.

Чтобы обойти это, давайте добавим аргумент тайм-аута к нашему вызову await().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

Как мы видим, тест в конце концов истечет, и await() вернет false .

6. Заключение

В этом кратком руководстве мы продемонстрировали, как можно использовать CountDownLatch для блокировки потока до тех пор, пока другие потоки не закончат некоторую обработку.

Мы также показали, как его можно использовать для отладки проблем параллелизма, обеспечив параллельное выполнение потоков.

Реализацию этих примеров можно найти на GitHub ; это проект на основе Maven, поэтому его легко запустить как есть.