1. Обзор
В этой статье мы рассмотрим одну из самых полезных конструкций java.util.concurrent
для решения проблемы параллельного производителя-потребителя. Мы рассмотрим API интерфейса BlockingQueue
и то, как методы этого интерфейса облегчают написание параллельных программ.
Позже в этой статье мы покажем пример простой программы с несколькими потоками-производителями и несколькими потоками-потребителями.
2. Типы блокирующих очередей
Мы можем выделить два типа BlockingQueue
:
- неограниченная очередь – может расти практически бесконечно
- ограниченная очередь – с заданной максимальной емкостью
2.1. Неограниченная очередь
Создать неограниченные очереди просто:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
Для емкости blockingQueue
будет установлено значение Integer.MAX_VALUE.
Все операции, которые добавляют элемент в неограниченную очередь, никогда не будут блокироваться, поэтому она может вырасти до очень большого размера.
Самое главное при разработке программы производитель-потребитель с использованием неограниченной очереди BlockingQueue заключается в том, что потребители должны иметь возможность потреблять сообщения так же быстро, как производители добавляют сообщения в очередь. В противном случае память может заполниться, и мы получим исключение OutOfMemory
.
2.2. Ограниченная очередь
Второй тип очередей — это ограниченная очередь. Мы можем создавать такие очереди, передавая емкость в качестве аргумента конструктору:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
Здесь у нас есть blockingQueue
, емкость которого равна 10. Это означает, что когда производитель пытается добавить элемент в уже заполненную очередь, в зависимости от метода, который использовался для его добавления ( offer()
, add()
или put ()
), он будет блокироваться до тех пор, пока не освободится место для вставки объекта. В противном случае операции будут неудачными.
Использование ограниченной очереди — хороший способ разработки параллельных программ, потому что, когда мы вставляем элемент в уже заполненную очередь, эти операции должны ждать, пока потребители догонят и освободит место в очереди. Это дает нам дросселирование без каких-либо усилий с нашей стороны.
3. API -интерфейс BlockingQueue
В интерфейсе BlockingQueue есть два типа методов
:
методы, отвечающие за добавление элементов в очередь, и методы, извлекающие эти элементы. Каждый метод из этих двух групп ведет себя по-разному, если очередь заполнена/пуста.
3.1. Добавление элементов
add() —
возвращаетtrue
, если вставка прошла успешно, в противном случае генерируетисключение IllegalStateException.
put() —
вставляет указанный элемент в очередь, при необходимости ожидая свободного слотаoffer() –
возвращаетtrue
, если вставка прошла успешно, иначеfalse
offer(E e, long timeout, TimeUnit unit) –
пытается вставить элемент в очередь и ждет доступного слота в течение заданного таймаута
3.2. Получение элементов
take()
— ждет головной элемент очереди и удаляет его. Если очередь пуста, она блокируется и ждет, пока элемент станет доступным.poll(long timeout, блок TimeUnit) —
извлекает и удаляет голову очереди, ожидая до указанного времени ожидания, если необходимо, чтобы элемент стал доступным. Возвращаетnull
после тайм-аута ``
Эти методы являются наиболее важными строительными блоками интерфейса BlockingQueue
при создании программ производитель-потребитель.
4. Пример многопоточного производителя-потребителя
Создадим программу, состоящую из двух частей — производителя и потребителя.
Производитель создаст случайное число от 0 до 100 и поместит это число в BlockingQueue
. У нас будет 4 потока-производителя, и мы будем использовать метод put()
для блокировки до тех пор, пока в очереди не будет свободного места.
Важно помнить, что нам нужно, чтобы наши потребительские потоки не ждали появления элемента в очереди на неопределенный срок.
Хороший способ сигнализировать от производителя к потребителю, что сообщений для обработки больше нет, — отправить специальное сообщение, называемое ядовитой пилюлей. Нам нужно отправить столько ядовитых пилюль, сколько у нас есть потребителей. Затем, когда потребитель возьмет это специальное сообщение о ядовитых пилюлях из очереди, оно изящно завершит выполнение.
Давайте посмотрим на класс производителя:
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
Наш конструктор производителя принимает в качестве аргумента BlockingQueue
, который используется для координации обработки между производителем и потребителем. Мы видим, что метод generateNumbers()
поместит в очередь 100 элементов. Требуется также сообщение о ядовитых пилюлях, чтобы знать, какой тип сообщения должен быть помещен в очередь, когда выполнение будет завершено. Это сообщение должно быть помещено в очередь отравляющих PillPerProducer
раз.
Каждый потребитель будет брать элемент из BlockingQueue
с помощью метода take()
, поэтому он будет блокироваться до тех пор, пока в очереди не появится элемент. После взятия целого
числа из очереди он проверяет, является ли сообщение ядовитой пилюлей, и если да, то выполнение потока завершается. В противном случае он выведет результат на стандартный вывод вместе с именем текущего потока.
Это даст нам представление о внутренней работе наших потребителей:
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Важно отметить использование очереди. Как и в конструкторе производителя, в качестве аргумента передается очередь. Мы можем это сделать, потому что BlockingQueue
может быть разделена между потоками без какой-либо явной синхронизации.
Теперь, когда у нас есть производитель и потребитель, мы можем начать нашу программу. Нам нужно определить емкость очереди, и мы устанавливаем ее на 100 элементов.
Мы хотим иметь 4 потока-производителя, а количество потоков-потребителей будет равно количеству доступных процессоров:
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue
создается с помощью конструкции с емкостью. Мы создаем 4 производителя и N потребителей. Мы указываем, что наше сообщение с ядовитой пилюлей должно быть Integer.MAX_VALUE
, потому что такое значение никогда не будет отправлено нашим производителем при нормальных рабочих условиях. Здесь важно отметить, что BlockingQueue
используется для координации работы между ними.
Когда мы запускаем программу, 4 потока производителя будут помещать случайные целые числа
в BlockingQueue
, а потребители будут брать эти элементы из очереди. Каждый поток выводит на стандартный вывод имя потока вместе с результатом.
5. Вывод
В этой статье показано практическое использование BlockingQueue
и объясняются методы, используемые для добавления и извлечения из него элементов. Кроме того, мы показали, как создать многопоточную программу производитель-потребитель, используя BlockingQueue
для координации работы между производителями и потребителями.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект на основе Maven, поэтому его должно быть легко импортировать и запускать как есть.