1. Обзор
В этой статье мы рассмотрим конструкцию Phaser
из пакета java.util.concurrent
. Это очень похожая конструкция на CountDownLatch
, которая позволяет нам координировать выполнение потоков. По сравнению с CountDownLatch
имеет некоторые дополнительные функции.
Phaser
— это барьер, на котором динамическое количество потоков должно ждать, прежде чем продолжить выполнение. В CountDownLatch
это число нельзя настроить динамически, и его необходимо указать при создании экземпляра.
2. Фазер
API
Phaser позволяет
нам построить логику, в которой потоки должны ждать на барьере, прежде чем перейти к следующему шагу выполнения .
Мы можем координировать несколько фаз выполнения, повторно используя экземпляр Phaser
для каждой фазы программы. На каждой фазе может быть разное количество потоков, ожидающих перехода к другой фазе. Позже мы рассмотрим пример использования фаз.
Чтобы участвовать в координации, поток должен зарегистрировать
себя в экземпляре Phaser .
Обратите внимание, что это только увеличивает количество зарегистрированных сторон, и мы не можем проверить, зарегистрирован ли текущий поток — нам пришлось бы создать подкласс реализации, чтобы поддерживать это.
Поток сигнализирует о том, что он достиг барьера, вызывая метод прибытияAndAwaitAdvance()
, который является блокирующим методом. Когда количество прибывших участников сравняется с количеством зарегистрированных, выполнение программы продолжится , а номер этапа будет увеличиваться. Мы можем получить номер текущей фазы, вызвав метод getPhase()
.
Когда поток завершает свою работу, мы должны вызвать метод прибытияAndDeregister()
, чтобы сообщить, что текущий поток больше не должен учитываться на этой конкретной фазе.
3. Реализация логики с помощью Phaser
API
Допустим, мы хотим скоординировать несколько этапов действий. Три потока будут обрабатывать первую фазу, а две — вторую.
Мы создадим класс LongRunningAction
, реализующий интерфейс Runnable :
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;
LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
ph.register();
}
@Override
public void run() {
ph.arriveAndAwaitAdvance();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
ph.arriveAndDeregister();
}
}
Когда создается экземпляр нашего класса действий, мы регистрируемся в экземпляре Phaser с помощью метода
register()
. Это увеличит количество потоков, использующих этот конкретный Phaser.
Вызов функции прибытияAndAwaitAdvance()
заставит текущий поток ожидать на барьере. Как уже было сказано, когда количество прибывших партий сравняется с количеством зарегистрированных, исполнение продолжится.
После завершения обработки текущий поток отменяет регистрацию, вызывая метод прибытияAndDeregister()
.
Давайте создадим тестовый пример, в котором мы запустим три потока LongRunningAction
и заблокируем барьер. Далее, после завершения действия, мы создадим два дополнительных потока LongRunningAction
, которые будут выполнять обработку следующей фазы.
При создании экземпляра Phaser
из основного потока мы передаем 1
в качестве аргумента. Это эквивалентно вызову метода register()
из текущего потока. Мы делаем это потому, что когда мы создаем три рабочих потока, главный поток является координатором, и поэтому Phaser
должен иметь четыре зарегистрированных в нем потока:
ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());
Фаза после инициализации равна нулю.
Класс Phaser
имеет конструктор, в котором мы можем передать ему родительский экземпляр. Это полезно в тех случаях, когда у нас есть большое количество сторон, которые могут столкнуться с огромными затратами на синхронизацию. В таких ситуациях экземпляры фазеров
могут быть настроены таким образом, чтобы группы подфазеров имели общего родителя.
Далее запустим три потока действий LongRunningAction
, которые будут ожидать на барьере, пока мы не вызовем метод прибытияAndAwaitAdvance()
из основного потока.
Имейте в виду, что мы инициализировали наш Phaser значением
1
и вызвали register()
еще три раза. Итак, три потока действий объявили, что достигли барьера, поэтому нужен еще один вызов appendAndAwaitAdvance()
— тот, что из основного потока:
executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());
После завершения этой фазы метод getPhase()
вернет единицу, потому что программа завершила обработку первого шага выполнения.
Допустим, два потока должны провести следующую фазу обработки. Для этого мы можем использовать Phaser
, потому что он позволяет нам динамически настраивать количество потоков, которые должны ожидать на барьере. Мы запускаем два новых потока, но они не будут выполняться до тех пор, пока из основного потока не будет вызвана функция прибытияAndAwaitAdvance()
(так же, как и в предыдущем случае):
executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();
assertEquals(2, ph.getPhase());
ph.arriveAndDeregister();
После этого метод getPhase()
вернет номер фазы, равный двум. Когда мы хотим закончить нашу программу, нам нужно вызвать метод прибытияAndDeregister()
, так как основной поток все еще зарегистрирован в Phaser.
Когда отмена регистрации приводит к тому, что количество зарегистрированных сторон становится равным нулю, Phaser
завершается .
Все вызовы методов синхронизации больше не будут блокироваться и немедленно вернутся.
Запуск программы приведет к следующему выводу (полный исходный код с операторами строки печати можно найти в репозитории кода):
This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action
Мы видим, что все потоки ждут выполнения, пока барьер не откроется. Следующая фаза выполнения выполняется только после успешного завершения предыдущей.
4. Вывод
В этом руководстве мы рассмотрели конструкцию Phaser
из java.util.concurrent
и реализовали логику координации с несколькими фазами, используя класс Phaser .
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.