1. Обзор
Многопоточное программирование позволяет нам запускать потоки одновременно, и каждый поток может выполнять разные задачи. Таким образом, он оптимально использует ресурсы, особенно когда наш компьютер имеет несколько многоядерных процессоров или несколько процессоров.
Иногда нам нужно управлять несколькими потоками, чтобы они запускались одновременно.
В этом уроке мы сначала поймем требование, особенно значение «точно в то же время». Кроме того, мы рассмотрим, как запустить два потока одновременно в Java.
2. Понимание требования
Наше требование: «запускать два потока одновременно».
Это требование кажется простым для понимания. Однако, если мы хорошенько об этом подумаем, возможно ли запустить два потока в ТОЧНОЕ
одно и то же время?
Прежде всего, каждый поток будет потреблять процессорное время для работы. Поэтому, если наше приложение работает на компьютере с одноядерным процессором, запустить два потока одновременно
невозможно .
Если наш компьютер имеет многоядерный процессор или несколько процессоров, два потока могут запускаться в одно
и то же время. Однако мы не можем контролировать это на стороне Java.
Это связано с тем, что когда мы работаем с потоками в Java, планирование потоков Java зависит от планирования потоков операционной системы . Таким образом, разные операционные системы могут обрабатывать его по-разному.
Более того, если мы обсудим «точно такое же время» более строго, согласно специальной теории относительности Эйнштейна :
Невозможно в абсолютном смысле сказать, что два разных события происходят одновременно, если эти события разделены в пространстве.
Независимо от того, насколько близко наши процессоры расположены на материнской плате или ядра, расположенные в процессоре, есть пробелы. Поэтому мы не можем гарантировать, что два потока запустятся в ТОЧНОЕ
одно и то же время.
Значит ли это, что требование недействительно?
Нет. Это действительное требование. Даже если мы не можем заставить два потока запускаться в ТОЧНОЕ
одно и то же время, мы можем приблизиться к ним с помощью некоторых методов синхронизации.
Эти методы могут помочь нам в большинстве практических случаев, когда нам нужно, чтобы два потока запускались «в одно и то же время».
В этом уроке мы рассмотрим два подхода к решению этой проблемы:
- Использование класса
CountDownLatch
- Использование класса
CyclicBarrier
- Использование класса
Phaser
Все подходы следуют одной и той же идее: мы не будем запускать два потока одновременно. Вместо этого мы блокируем потоки сразу после запуска потоков и пытаемся возобновить их выполнение одновременно.
Поскольку наши тесты будут связаны с планированием потоков, стоит упомянуть среду для запуска тестов в этом руководстве:
- ЦП: ЦП Intel(R) Core(TM) i7-8850H. Тактовая частота процессора составляет от 2,6 до 4,3 ГГц (4.1 с 4 ядрами, 4 ГГц с 6 ядрами).
- Операционная система: 64-разрядная версия Linux с ядром версии 5.12.12.
- Ява: Ява 11
Теперь давайте посмотрим на CountDonwLatch
и CyclicBarrier
в действии.
3. Использование класса CountDownLatch
CountDownLatch
— это синхронизатор, представленный в Java 5 как часть пакета java.util.concurrent
. Обычно мы используем CountDownLatch
для блокировки потоков до тех пор, пока другие потоки не завершат свои задачи.
Проще говоря, мы устанавливаем счетчик
в объекте- защелке
и связываем объект- защелку
с некоторыми потоками. Когда мы запускаем эти потоки, они будут заблокированы до тех пор, пока счетчик защелки не станет равным нулю. ``
С другой стороны, в других потоках мы можем контролировать, при каких условиях мы уменьшаем количество
и позволяем возобновить заблокированные потоки, например, когда некоторые задачи в основном потоке выполнены.
3.1. Рабочий поток
Теперь давайте посмотрим, как решить нашу проблему с помощью класса CountDownLatch
.
Во-первых, мы создадим наш класс Thread .
Назовем его WorkerWithCountDownLatch
:
public class WorkerWithCountDownLatch extends Thread {
private CountDownLatch latch;
public WorkerWithCountDownLatch(String name, CountDownLatch latch) {
this.latch = latch;
setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the latch...\n", getName());
latch.await();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (InterruptedException e) {
// handle exception
}
}
Мы добавили объект- защелку
в наш класс WorkerWithCountDownLatch
. Во-первых, давайте разберемся с функцией объекта- защелки
.
В методе run()
мы вызываем метод latch.await().
Это означает, что если мы запустим рабочий
поток, он проверит количество защелок.
Поток будет заблокирован до тех пор, пока счетчик
не станет равным нулю.
Таким образом, мы можем создать защелку CountDownLatch(1)
со значением count=1
в основном потоке и связать объект- защелку
с двумя рабочими потоками, которые мы хотим запускать одновременно.
Когда мы хотим, чтобы два потока возобновили выполнение своей реальной работы, мы освобождаем защелку, вызывая latch.countDown()
в основном потоке.
Далее давайте посмотрим, как основной поток управляет двумя рабочими потоками.
3.2. Основная тема
Мы реализуем основной поток в методе usingCountDownLatch()
:
private static void usingCountDownLatch() throws InterruptedException {
System.out.println("===============================================");
System.out.println(" >>> Using CountDownLatch <<<<");
System.out.println("===============================================");
CountDownLatch latch = new CountDownLatch(1);
WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch);
WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now release the latch:");
System.out.println("-----------------------------------------------");
latch.countDown();
}
Теперь давайте вызовем метод usingCountDownLatch()
выше из нашего метода main()
. Когда мы запустим метод main()
, мы увидим вывод:
===============================================
>>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z
Как видно из приведенного выше вывода, два рабочих потока запустились почти
одновременно. Разница между двумя начальными временами составляет менее двух микросекунд .
4. Использование класса CyclicBarrier
Класс CyclicBarrier
— еще один синхронизатор, представленный в Java 5. По сути, CyclicBarrier
позволяет фиксированному числу потоков ожидать, пока друг друга не достигнут общей точки, прежде чем продолжить выполнение .
Далее давайте посмотрим, как мы решим нашу проблему, используя класс CyclicBarrier
.
4.1. Рабочий поток
Давайте сначала посмотрим на реализацию нашего рабочего потока:
public class WorkerWithCyclicBarrier extends Thread {
private CyclicBarrier barrier;
public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) {
this.barrier = barrier;
this.setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the barrier\n", getName());
barrier.await();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (InterruptedException | BrokenBarrierException e) {
// handle exception
}
}
}
Реализация довольно проста. Мы связываем объект- барьер
с рабочими потоками. Когда поток запускается, мы немедленно вызываем метод барьера .
Таким образом, рабочий поток будет заблокирован и будет ожидать возобновления выполнения всеми сторонами вызова барьера
.
4.2. Основная тема
Далее давайте посмотрим, как управлять двумя рабочими потоками, возобновляющими работу в основном потоке:
private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException {
System.out.println("\n===============================================");
System.out.println(" >>> Using CyclicBarrier <<<<");
System.out.println("===============================================");
CyclicBarrier barrier = new CyclicBarrier(3);
WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier);
WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now open the barrier:");
System.out.println("-----------------------------------------------");
barrier.await();
}
Наша цель — разрешить одновременное возобновление работы двух рабочих потоков. Итак, вместе с основным потоком у нас всего три потока.
Как показано в приведенном выше методе, мы создаем объект- барьер
с тремя сторонами в основном потоке. Далее мы создаем и запускаем два рабочих потока.
Как мы обсуждали ранее, два рабочих потока заблокированы и ожидают возобновления работы барьера.
В основном потоке мы можем сделать некоторую реальную работу. Когда мы решаем открыть барьер, мы вызываем метод барьер. ожидание ()
, чтобы позволить двум рабочим процессам продолжить выполнение.
Если мы вызовем usingCyclicBarrier()
в методе main()
, мы получим вывод:
===============================================
>>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z
Мы можем сравнить два времени запуска рабочих. Даже если два рабочих не запустятся в одно и то же время, мы довольно близки к нашей цели: разница между двумя моментами запуска составляет менее трех микросекунд.
5. Использование класса Phaser
Класс Phaser
— это синхронизатор, представленный в Java 7. Он похож на CyclicBarrier
и CountDownLatch
. Однако класс Phaser
более гибкий.
Например, в отличие от CyclicBarrier
и CountDownLatch
, Phaser
позволяет нам динамически регистрировать стороны потока.
Далее решим задачу с помощью Phaser
.
5.1. Рабочий поток
Как обычно, мы сначала посмотрим на реализацию, а затем поймем, как она работает:
public class WorkerWithPhaser extends Thread {
private Phaser phaser;
public WorkerWithPhaser(String name, Phaser phaser) {
this.phaser = phaser;
phaser.register();
setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the phaser\n", getName());
phaser.arriveAndAwaitAdvance();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (IllegalStateException e) {
// handle exception
}
}
}
Когда создается экземпляр рабочего потока, мы регистрируем текущий поток в заданном объекте Phaser , вызывая
Phaser.register()
. Таким образом, текущая работа становится одной из сторон фазерного
барьера.
Затем, когда рабочий поток запускается, мы немедленно вызываем Phaser.arriveAndAwaitAdvance()
. Таким образом, мы сообщаем Phaser
, что текущий поток прибыл и будет ждать прибытия других сторон потока, чтобы продолжить. Конечно, до прибытия других участников потока текущий поток блокируется.
5.2. Основная тема
Далее пойдем дальше и посмотрим на реализацию основного потока:
private static void usingPhaser() throws InterruptedException {
System.out.println("\n===============================================");
System.out.println(" >>> Using Phaser <<<");
System.out.println("===============================================");
Phaser phaser = new Phaser();
phaser.register();
WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser);
WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now open the phaser barrier:");
System.out.println("-----------------------------------------------");
phaser.arriveAndAwaitAdvance();
}
В коде выше, как мы видим, основной поток регистрирует себя как поток-участник объекта Phaser
.
После того, как мы создали и заблокировали два рабочих
потока, основной поток также вызывает Phaser.arriveAndAwaitAdvance()
. Таким образом, мы открываем фазерный барьер, чтобы два рабочих
потока могли возобновить работу одновременно.
Наконец, давайте вызовем метод usingPhaser()
в методе main()
:
===============================================
>>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser
[ Worker with phaser 2 ] created, blocked by the phaser
-----------------------------------------------
Now open the phaser barrier:
-----------------------------------------------
[ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z
[ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z
Точно так же два рабочих потока стартовали почти
одновременно. Разница между двумя начальными временами составляет менее двух микросекунд .
6. Заключение
В этой статье мы впервые обсудили требование: «запускать два потока одновременно».
Далее мы рассмотрели два подхода к одновременному запуску трех потоков: использование CountDownLatch
, CyclicBarrier
и Phaser
.
Их идеи похожи, блокируя два потока и пытаясь позволить им возобновить выполнение одновременно.
Несмотря на то, что эти подходы не могут гарантировать запуск двух потоков в одно и то же время, результат довольно близок и достаточен для большинства случаев в реальном мире.
Как всегда, код статьи можно найти на GitHub .