1. Обзор
В этом уроке мы рассмотрим один из самых фундаментальных механизмов в Java — синхронизацию потоков.
Сначала мы обсудим некоторые важные термины и методологии, связанные с параллелизмом.
И мы разработаем простое приложение, в котором мы будем иметь дело с проблемами параллелизма, с целью лучшего понимания wait()
и notify()
.
2. Синхронизация потоков в Java
В многопоточной среде несколько потоков могут попытаться изменить один и тот же ресурс. Неправильное управление потоками, конечно, приведет к проблемам согласованности.
2.1. Охраняемые блоки в Java
Одним из инструментов, который мы можем использовать для координации действий нескольких потоков в Java, являются защищенные блоки. Такие блоки проверяют определенное условие перед возобновлением выполнения.
Имея это в виду, мы будем использовать следующее:
Object.wait()
для приостановки потокаObject.notify()
для пробуждения потока
Мы можем лучше понять это из следующей диаграммы, изображающей жизненный цикл Thread
:
Обратите внимание, что существует множество способов управления этим жизненным циклом. Однако в этой статье мы сосредоточимся только на wait()
и notify()
.
3. Метод ожидания ()
Проще говоря, вызов wait()
заставляет текущий поток ждать, пока какой-либо другой поток не вызовет notify( )
или notifyAll()
для того же объекта.
Для этого текущий поток должен владеть монитором объекта . Согласно Javadocs , это может произойти следующими способами:
- когда мы выполнили метод
синхронизированного
экземпляра для данного объекта - когда мы выполнили тело
синхронизированного
блока для данного объекта - путем выполнения
синхронизированных статических
методов для объектов типаClass
Обратите внимание, что только один активный поток может одновременно владеть монитором объекта.
Этот метод wait()
поставляется с тремя перегруженными сигнатурами. Давайте посмотрим на это.
3.1. ждать()
Метод wait()
заставляет текущий поток бесконечно ждать, пока другой поток не вызовет notify( )
для этого объекта или notifyAll()
.
3.2. ждать (длительный тайм-аут)
Используя этот метод, мы можем указать тайм-аут, после которого поток будет автоматически разбужен. Поток можно разбудить до достижения тайм-аута с помощью notify( )
или notifyAll()
.
Обратите внимание, что вызов wait(0)
аналогичен вызову wait()
.
3.3. ожидание (длинный тайм-аут, int nanos)
Это еще одна подпись, обеспечивающая ту же функциональность. Единственная разница здесь в том, что мы можем обеспечить более высокую точность.
Общий период ожидания (в наносекундах) рассчитывается как 1_000_000*timeout + nanos
.
4. уведомить()
и уведомить всех()
Мы используем метод notify()
для пробуждения потоков, ожидающих доступа к монитору этого объекта.
Существует два способа уведомления ожидающих потоков.
4.1. уведомлять()
Для всех потоков, ожидающих на мониторе этого объекта (используя любой из методов wait()
), метод notify()
уведомляет любой из них о произвольном пробуждении. Выбор того, какой именно поток разбудить, недетерминирован и зависит от реализации.
Поскольку notify()
пробуждает один случайный поток, мы можем использовать его для реализации взаимоисключающей блокировки, когда потоки выполняют схожие задачи. Но в большинстве случаев было бы целесообразнее реализовать notifyAll()
.
4.2. уведомить всех ()
Этот метод просто пробуждает все потоки, ожидающие на мониторе этого объекта.
Пробужденные потоки завершатся обычным образом, как и любые другие потоки.
Но прежде чем разрешить их выполнение, всегда определяйте быструю проверку условия, необходимого для продолжения работы с потоком. Это связано с тем, что могут быть ситуации, когда поток был разбужен без получения уведомления (этот сценарий обсуждается позже в примере).
5. Проблема синхронизации отправителя и получателя
Теперь, когда мы познакомились с основами, давайте рассмотрим простое приложение Sender
- Receiver
, которое будет использовать методы wait()
и notify()
для настройки синхронизации между ними:
Отправитель
должен отправить пакетданных
Получателю .- Получатель не может обработать пакет данных, пока
Отправитель не
завершит
его отправку. `` - Точно так же
отправитель
не должен пытаться отправить другой пакет, если толькополучатель
уже не обработал предыдущий пакет.
Давайте сначала создадим класс Data , состоящий из
пакета
данных , который будет отправлен от Sender
к Receiver
. Мы будем использовать wait()
и notifyAll()
для настройки синхронизации между ними:
public class Data {
private String packet;
// True if receiver should wait
// False if sender should wait
private boolean transfer = true;
public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = true;
String returnPacket = packet;
notifyAll();
return returnPacket;
}
public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = false;
this.packet = packet;
notifyAll();
}
}
Давайте разберем, что здесь происходит:
Переменная
пакета
обозначает данные, которые передаются по сети.У нас есть
логическая
переменнаяtransfer
, которуюОтправитель
иПолучатель
будут использовать для синхронизации:Если эта переменная имеет значение
true
,Получатель
должен ждать, покаОтправитель
отправит сообщение.Если это
false
,отправитель
должен ждать, покаполучатель
получит сообщение.Отправитель
использует метод send ()
для отправки данныхполучателю
:Если
transfer
равенfalse
, мы будем ждать, вызвавwait()
в этом потоке.Но когда это
правда
, мы переключаем статус, устанавливаем наше сообщение и вызываемnotifyAll()
, чтобы разбудить другие потоки, чтобы указать, что произошло важное событие, и они могут проверить, могут ли они продолжить выполнение.Точно так же
Receiver
будет использовать методreceive()
:Если
Sender
установил дляпередачи значение
false
, только тогда она продолжится, иначе мы вызовемwait()
в этом потоке.Когда условие выполняется, мы переключаем статус, уведомляем все ожидающие потоки о пробуждении и возвращаем полученный пакет данных.
5.1. Зачем заключать ожидание()
в цикл while
?
Поскольку notify( )
и notifyAll()
случайным образом пробуждают потоки, ожидающие на мониторе этого объекта, выполнение условия не всегда важно. Иногда поток пробуждается, но условие на самом деле еще не выполнено.
Мы также можем определить проверку, чтобы уберечь нас от ложных пробуждений, когда поток может выйти из ожидания, даже не получив уведомления.
5.2. Зачем нам нужно синхронизировать методы end()
и Receive()
?
Мы поместили эти методы внутри синхронизированных
методов, чтобы обеспечить встроенные блокировки. Если поток, вызывающий метод wait()
, не владеет встроенной блокировкой, будет выдана ошибка.
Теперь мы создадим Sender
и Receiver
и реализуем интерфейс Runnable
для обоих, чтобы их экземпляры могли выполняться потоком.
Сначала посмотрим, как будет работать Sender :
public class Sender implements Runnable {
private Data data;
// standard constructors
public void run() {
String packets[] = {
"First packet",
"Second packet",
"Third packet",
"Fourth packet",
"End"
};
for (String packet : packets) {
data.send(packet);
// Thread.sleep() to mimic heavy server-side processing
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
Давайте внимательнее посмотрим на этого отправителя
:
- Мы создаем несколько случайных пакетов данных, которые будут отправляться по сети в виде массива
пакетов [] .
- Для каждого пакета мы просто вызываем send().
- Затем мы вызываем
Thread.sleep()
со случайным интервалом, чтобы имитировать тяжелую обработку на стороне сервера.
Наконец, давайте реализуем наш Receiver
:
public class Receiver implements Runnable {
private Data load;
// standard constructors
public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {
System.out.println(receivedMessage);
// ...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
Здесь мы просто вызываем load.receive()
в цикле, пока не получим последний пакет данных End .
Давайте теперь посмотрим на это приложение в действии:
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
}
Мы получим следующий вывод:
First packet
Second packet
Third packet
Fourth packet
И вот мы здесь. Мы получили все пакеты данных в правильном последовательном порядке и успешно установили правильную связь между отправителем и получателем.
6. Заключение
В этой статье мы обсудили некоторые основные концепции синхронизации в Java. В частности, мы сосредоточились на том, как мы можем использовать wait()
и notify()
для решения интересных проблем с синхронизацией. Наконец, мы рассмотрели пример кода, в котором мы применили эти концепции на практике.
Прежде чем мы закончим, стоит упомянуть, что все эти низкоуровневые API, такие как wait()
, notify( )
и notifyAll()
, являются традиционными методами, которые хорошо работают, но механизмы более высокого уровня часто проще и лучше — например, Java. родные интерфейсы Lock
and Condition (доступны в пакете
java.util.concurrent.locks
).
Для получения дополнительной информации о пакете java.util.concurrent
посетите наш обзор статьи java.util.concurrent. Блокировка
и условие
описаны в руководстве по java.util.concurrent.Locks .
Как всегда, полные фрагменты кода, используемые в этой статье, доступны на GitHub.