Перейти к основному содержимому

Циклический барьер в Java

· 6 мин. чтения

1. Введение

CyclicBarriers — это конструкции синхронизации, которые были представлены в Java 5 как часть пакета java.util.concurrent .

В этой статье мы рассмотрим эту реализацию в сценарии параллелизма.

2. Параллелизм Java — синхронизаторы

Пакет java.util.concurrent содержит несколько классов, помогающих управлять набором взаимодействующих друг с другом потоков. Некоторые из них включают:

  • циклическийбарьер
  • Фазер
  • Защелка
  • Обменник
  • семафор
  • Синхронная очередь

Эти классы предлагают готовые функциональные возможности для общих шаблонов взаимодействия между потоками.

Если у нас есть набор потоков, которые взаимодействуют друг с другом и напоминают один из распространенных шаблонов, мы можем просто повторно использовать соответствующие библиотечные классы (также называемые синхронизаторами ) вместо того, чтобы пытаться придумать собственную схему, используя набор блокировок и условий. объекты и синхронизированное ключевое слово.

Давайте сосредоточимся на CyclicBarrier в будущем.

3. Циклический барьер

CyclicBarrier — это синхронизатор, который позволяет набору потоков ожидать друг от друга достижения общей точки выполнения, также называемой барьером .

CyclicBarriers используются в программах, в которых у нас есть фиксированное количество потоков, которые должны ждать, пока друг друга достигнут общей точки, прежде чем продолжить выполнение.

Барьер называется циклическим , потому что его можно использовать повторно после освобождения ожидающих потоков.

4. Использование

Конструктор CyclicBarrier прост. Требуется одно целое число, обозначающее количество потоков, которым необходимо вызвать метод await() для экземпляра барьера, чтобы обозначить достижение общей точки выполнения:

public CyclicBarrier(int parties)

Потоки, которым необходимо синхронизировать свое выполнение, также называются сторонами , и вызов метода await() — это то, как мы можем зарегистрировать, что определенный поток достиг точки барьера.

Этот вызов является синхронным, и поток, вызывающий этот метод, приостанавливает выполнение до тех пор, пока указанное количество потоков не вызовет один и тот же метод на барьере. Такая ситуация, когда требуемое количество потоков вызвало await() , называется отключением барьера .

При желании мы можем передать второй аргумент конструктору, который является экземпляром Runnable . Это имеет логику, которая будет выполняться последним потоком, преодолевающим барьер:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Реализация

Чтобы увидеть CyclicBarrier в действии, давайте рассмотрим следующий сценарий:

Существует операция, которую выполняет фиксированное количество потоков и сохраняет соответствующие результаты в списке. Когда все потоки завершают выполнение своих действий, один из них (обычно последний, преодолевший барьер) начинает обработку данных, полученных каждым из них.

Давайте реализуем основной класс, в котором происходят все действия:

public class CyclicBarrierDemo {

private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;

// ...
}

Этот класс довольно прост: NUM_WORKERS — это количество потоков, которые будут выполняться, а NUM_PARTIAL_RESULTS — это количество результатов, которые будет производить каждый из рабочих потоков.

Наконец, у нас есть partialResults — список, в котором будут храниться результаты каждого из этих рабочих потоков. Обратите внимание, что этот список является SynchronizedList , потому что несколько потоков будут записывать в него одновременно, а метод add() не является потокобезопасным для простого ArrayList .

Теперь реализуем логику каждого из рабочих потоков:

public class CyclicBarrierDemo {

// ...

class NumberCruncherThread implements Runnable {

@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();

// Crunch some numbers and store the partial result
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}

partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}

}

Теперь мы реализуем логику, которая запускается при срабатывании барьера.

Для простоты давайте просто добавим все числа в список частичных результатов:

public class CyclicBarrierDemo {

// ...

class AggregatorThread implements Runnable {

@Override
public void run() {

String thisThreadName = Thread.currentThread().getName();

System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;

for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}

Последним шагом будет создание CyclicBarrier и начало работы с помощью метода main() :

public class CyclicBarrierDemo {

// Previous code

public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;

cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");

for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}

public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}

В приведенном выше коде мы инициализировали циклический барьер с 5 потоками, каждый из которых производит 3 целых числа как часть своих вычислений и сохраняет их в результирующем списке.

Как только барьер отключен, последний поток, отключивший барьер, выполняет логику, указанную в AggregatorThread, а именно — складывает все числа, произведенные потоками.

6. Результаты

Вот результат одного выполнения вышеуказанной программы — каждое выполнение может создавать разные результаты, поскольку потоки могут создаваться в другом порядке:

Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46

Как видно из приведенного выше вывода, поток 4 — это тот, который отключает барьер, а также выполняет логику финальной агрегации. Также нет необходимости, чтобы потоки выполнялись в том порядке, в котором они были запущены, как показано в приведенном выше примере.

7. Заключение

В этой статье мы увидели, что такое CyclicBarrier и в каких ситуациях он полезен.

Мы также реализовали сценарий, в котором нам требовалось фиксированное количество потоков для достижения фиксированной точки выполнения, прежде чем продолжить другую программную логику.

Как всегда, код руководства можно найти на GitHub .