1. Обзор
В этом руководстве мы узнаем, как реализовать проблему «производитель-потребитель» в Java. Эта проблема также известна как проблема ограниченного буфера .
Для получения более подробной информации о проблеме мы можем обратиться к вики-странице « Проблема производителя-потребителя» . Чтобы узнать об основах многопоточности и параллелизма в Java, обязательно ознакомьтесь с нашей статьей о параллелизме в Java .
2. Проблема производитель-потребитель
Производитель и потребитель — это два отдельных процесса. Оба процесса совместно используют общий буфер или очередь. Производитель постоянно производит определенные данные и помещает их в буфер, тогда как потребитель потребляет эти данные из буфера.
Давайте рассмотрим диаграмму, показывающую этот простой сценарий:
По своей сути, эта проблема имеет определенные сложности для решения :
- И производитель, и потребитель могут попытаться обновить очередь одновременно. Это может привести к потере данных или несоответствиям.
- Производители могут быть медленнее, чем потребители. В таких случаях потребитель будет быстро обрабатывать элементы и ждать.
- В некоторых случаях потребитель может быть медленнее производителя. Эта ситуация приводит к проблеме переполнения очереди.
- В реальных сценариях у нас может быть несколько производителей, несколько потребителей или и то, и другое. Это может привести к тому, что одно и то же сообщение будет обрабатываться разными потребителями.
На диаграмме ниже показан случай с несколькими производителями и несколькими потребителями:
Нам нужно справиться с совместным использованием ресурсов и синхронизацией, чтобы решить несколько сложностей:
- Синхронизация в очереди при добавлении и удалении данных
- Если очередь пуста, потребитель должен ждать, пока производитель добавит новые данные в очередь.
- Когда очередь заполнена, производитель должен ждать, пока потребитель не использует данные, а в очереди есть какой-то пустой буфер.
3. Пример Java с использованием потоков
Мы определили отдельный класс для каждой сущности проблемы.
3.1. Класс сообщения
Класс Message
содержит полученные данные:
public class Message {
private int id;
private double data;
// constructors and getter/setters
}
Данные могут быть любого типа. Это может быть строка JSON, сложный объект или просто число. Кроме того, не обязательно оборачивать данные в класс Message .
3.2. Класс DataQueue
Общая очередь и связанные с ней объекты заключены в класс DataQueue
:
public class DataQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object FULL_QUEUE = new Object();
private final Object EMPTY_QUEUE = new Object();
DataQueue(int maxSize) {
this.maxSize = maxSize;
}
// other methods
}
Для создания ограниченного буфера берется очередь
и ее maxSize
.
В Java блок synchronized
использует объект для синхронизации потока. Каждый объект имеет внутреннюю блокировку. Только поток, который первым получает блокировку, может выполнять синхронизированный
блок.
Здесь мы создали две ссылки, FULL_QUEUE
и EMPTY_QUEUE
, чтобы использовать их для синхронизации. Поскольку у этих дескрипторов нет другого назначения, мы инициализировали их с помощью класса Object .
Когда очередь заполнена, производитель ожидает объект FULL_QUEUE
. И потребитель уведомляет, как только он потребляет сообщение.
Процесс-производитель вызывает метод waitOnFull
:
public void waitOnFull() throws InterruptedException {
synchronized (FULL_QUEUE) {
FULL_QUEUE.wait();
}
}
И процесс-потребитель уведомляет производителя с помощью метода notifyAllForFull
:
public void notifyAllForFull() {
synchronized (FULL_QUEUE) {
FULL_QUEUE.notifyAll();
}
}
Если очередь пуста, потребитель ожидает объект EMPTY_QUEUE
. И производитель уведомляет его, как только сообщение добавляется в очередь.
Процесс-потребитель ожидает, используя метод waitOnEmpty
:
public void waitOnEmpty() throws InterruptedException {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.wait();
}
}
Производитель уведомляет потребителя с помощью метода notifyAllForEmpty
:
public void notifyAllForEmpty() {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.notify();
}
}
А производитель использует метод add()
для добавления сообщения в очередь:
public void add(Message message) {
synchronized (queue) {
queue.add(message);
}
}
Потребитель вызывает метод удаления
для извлечения сообщения из очереди:
public Message remove() {
synchronized (queue) {
return queue.poll();
}
}
3.3. Класс продюсера
Класс Producer
реализует интерфейс Runnable , позволяющий создавать потоки:
public class Producer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
produce();
}
// Other methods
}
Конструктор использует общий параметр dataQueue
. Переменная- член runFlag
помогает изящно остановить процесс производителя. Он инициализируется значением true
.
Запуск потока вызывает метод product()
:
public void produce() {
while (runFlag) {
Message message = generateMessage();
while (dataQueue.isFull()) {
try {
dataQueue.waitOnFull();
} catch (InterruptedException e) {
break;
}
}
if (!runFlag) {
break;
}
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
}
}
Производитель выполняет шаги непрерывно в цикле while
. Этот цикл прерывается, когда runFlag
имеет значение false
.
На каждой итерации он генерирует сообщение. Затем он проверяет, заполнена ли очередь, и ждет, когда это необходимо. Вместо блока if
используется цикл while
для проверки заполнения очереди. Это делается для того, чтобы избежать ложного пробуждения из состояния ожидания.
Когда производитель выходит из состояния ожидания, он проверяет, нужно ли ему продолжать или выйти из процесса. Он добавляет сообщение в очередь и уведомляет потребителя, ожидающего в пустой очереди.
Метод stop
() изящно завершает процесс:
public void stop() {
runFlag = false;
dataQueue.notifyAllForFull();
}
После изменения runFlag
на false
все производители, ожидающие в состоянии «очередь заполнена», получают уведомление. Это гарантирует завершение всех потоков производителя.
3.4. Потребительский класс
Класс Consumer
реализует Runnable
, чтобы разрешить создание потоков:
public class Consumer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
public Consumer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
consume();
}
// Other methods
}
Его конструктор имеет общую очередь данных
в качестве параметра. runFlag инициализируется значением
true
. Этот флаг останавливает процесс потребителя, когда это необходимо.
Когда поток запускается, он запускает метод потребления
:
public void consume() {
while (runFlag) {
Message message;
if (dataQueue.isEmpty()) {
try {
dataQueue.waitOnEmpty();
} catch (InterruptedException e) {
break;
}
}
if (!runFlag) {
break;
}
message = dataQueue.remove();
dataQueue.notifyAllForFull();
useMessage(message);
}
}
Он имеет постоянно работающий цикл while
. И этот процесс изящно останавливается, когда runFlag
имеет значение false
.
Каждая итерация проверяет, пуста ли очередь. Если очередь пуста, потребитель ожидает создания сообщения . Это ожидание также используется циклом while
, чтобы избежать ложных пробуждений.
Когда потребитель выходит из состояния ожидания, он проверяет runFlag
. Если флаг равен false
, то он выходит из цикла. В противном случае он считывает сообщение из очереди и уведомляет производителя о том, что оно ожидает в состоянии «полная очередь». Наконец, он потребляет сообщение.
Чтобы изящно остановить процесс, он использует метод stop()
:
public void stop() {
runFlag = false;
dataQueue.notifyAllForEmpty();
}
После того , как для runFlag
установлено значение false
, все потребители, ожидающие в состоянии пустой очереди, получают уведомление. Это гарантирует завершение всех потребительских потоков.
3.5. Запуск потоков производителей и потребителей
Давайте создадим объект dataQueue
с максимальной необходимой емкостью:
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
Теперь давайте создадим объект производителя и поток:
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
Затем мы инициализируем объект- потребитель
и поток:
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
Наконец, мы запускаем потоки, чтобы инициировать процесс:
producerThread.start();
consumerThread.start();
Он работает непрерывно, пока мы не захотим остановить эти потоки. Остановить их просто:
producer.stop();
consumer.stop();
3.6. Запуск нескольких производителей и потребителей
Работа с несколькими производителями и потребителями аналогична случаю с одним производителем и потребителем. Нам просто нужно создать необходимое количество потоков и запустить их.
Давайте создадим несколько производителей и потоков и запустим их:
Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) {
Thread producerThread = new Thread(producer);
producerThread.start();
}
Далее создадим необходимое количество объектов-потребителей и потоков:
Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
}
Мы можем изящно остановить процесс, вызвав метод stop()
для объектов производителя и потребителя:
producer.stop();
consumer.stop();
4. Упрощенный пример с использованием BlockingQueue
Java предоставляет интерфейс BlockingQueue
, который является потокобезопасным. Другими словами, несколько потоков могут добавлять и удалять из этой очереди без каких-либо проблем параллелизма .
Его метод put()
блокирует вызывающий поток, если очередь заполнена. Точно так же, если очередь пуста, ее метод take()
блокирует вызывающий поток.
4.1. Создать ограниченную очередь блокировки
Мы можем создать ограниченную BlockingQueue
, используя значение емкости в конструкторе:
BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);
4.2. Упрощенный метод производства
В методе product()
мы можем избежать явной синхронизации для нашей очереди:
private void produce() {
while (true) {
double value = generateValue();
try {
blockingQueue.put(value);
} catch (InterruptedException e) {
break;
}
}
}
Этот метод непрерывно создает объекты и просто добавляет их в очередь.
4.3. Упрощенный метод потребления
Метод потребления()
явно не использует синхронизацию:
private void consume() {
while (true) {
Double value;
try {
value = blockingQueue.take();
} catch (InterruptedException e) {
break;
}
// Consume value
}
}
Он просто берет значение из очереди и непрерывно использует его.
4.4. Запуск потоков производителей и потребителей
Мы можем создать столько потоков производителей и потребителей, сколько потребуется:
for (int i = 0; i < 2; i++) {
Thread producerThread = new Thread(this::produce);
producerThread.start();
}
for (int i = 0; i < 3; i++) {
Thread consumerThread = new Thread(this::consume);
consumerThread.start();
}
5. Вывод
В этой статье мы узнали, как реализовать проблему «производитель-потребитель» с помощью потоков Java. Кроме того, мы научились запускать сценарии с несколькими производителями и потребителями.
Полный пример кода можно найти на GitHub .