Перейти к основному содержимому

Руководство по java.util.concurrent.BlockingQueue

· 6 мин. чтения

1. Обзор

В этой статье мы рассмотрим одну из самых полезных конструкций java.util.concurrent для решения проблемы параллельного производителя-потребителя. Мы рассмотрим API интерфейса BlockingQueue и то, как методы этого интерфейса облегчают написание параллельных программ.

Позже в этой статье мы покажем пример простой программы с несколькими потоками-производителями и несколькими потоками-потребителями.

2. Типы блокирующих очередей

Мы можем выделить два типа BlockingQueue :

  • неограниченная очередь – может расти практически бесконечно
  • ограниченная очередь – с заданной максимальной емкостью

2.1. Неограниченная очередь

Создать неограниченные очереди просто:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

Для емкости blockingQueue будет установлено значение Integer.MAX_VALUE. Все операции, которые добавляют элемент в неограниченную очередь, никогда не будут блокироваться, поэтому она может вырасти до очень большого размера.

Самое главное при разработке программы производитель-потребитель с использованием неограниченной очереди BlockingQueue заключается в том, что потребители должны иметь возможность потреблять сообщения так же быстро, как производители добавляют сообщения в очередь. В противном случае память может заполниться, и мы получим исключение OutOfMemory .

2.2. Ограниченная очередь

Второй тип очередей — это ограниченная очередь. Мы можем создавать такие очереди, передавая емкость в качестве аргумента конструктору:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

Здесь у нас есть blockingQueue , емкость которого равна 10. Это означает, что когда производитель пытается добавить элемент в уже заполненную очередь, в зависимости от метода, который использовался для его добавления ( offer() , add() или put () ), он будет блокироваться до тех пор, пока не освободится место для вставки объекта. В противном случае операции будут неудачными.

Использование ограниченной очереди — хороший способ разработки параллельных программ, потому что, когда мы вставляем элемент в уже заполненную очередь, эти операции должны ждать, пока потребители догонят и освободит место в очереди. Это дает нам дросселирование без каких-либо усилий с нашей стороны.

3. API -интерфейс BlockingQueue

В интерфейсе BlockingQueue есть два типа методов : методы, отвечающие за добавление элементов в очередь, и методы, извлекающие эти элементы. Каждый метод из этих двух групп ведет себя по-разному, если очередь заполнена/пуста.

3.1. Добавление элементов

  • add() — возвращает true , если вставка прошла успешно, в противном случае генерирует исключение IllegalStateException.
  • put() — вставляет указанный элемент в очередь, при необходимости ожидая свободного слота
  • offer() – возвращает true , если вставка прошла успешно, иначе false
  • offer(E e, long timeout, TimeUnit unit) – пытается вставить элемент в очередь и ждет доступного слота в течение заданного таймаута

3.2. Получение элементов

  • take() — ждет головной элемент очереди и удаляет его. Если очередь пуста, она блокируется и ждет, пока элемент станет доступным.
  • poll(long timeout, блок TimeUnit) — извлекает и удаляет голову очереди, ожидая до указанного времени ожидания, если необходимо, чтобы элемент стал доступным. Возвращает null после тайм-аута ``

Эти методы являются наиболее важными строительными блоками интерфейса BlockingQueue при создании программ производитель-потребитель.

4. Пример многопоточного производителя-потребителя

Создадим программу, состоящую из двух частей — производителя и потребителя.

Производитель создаст случайное число от 0 до 100 и поместит это число в BlockingQueue . У нас будет 4 потока-производителя, и мы будем использовать метод put() для блокировки до тех пор, пока в очереди не будет свободного места.

Важно помнить, что нам нужно, чтобы наши потребительские потоки не ждали появления элемента в очереди на неопределенный срок.

Хороший способ сигнализировать от производителя к потребителю, что сообщений для обработки больше нет, — отправить специальное сообщение, называемое ядовитой пилюлей. Нам нужно отправить столько ядовитых пилюль, сколько у нас есть потребителей. Затем, когда потребитель возьмет это специальное сообщение о ядовитых пилюлях из очереди, оно изящно завершит выполнение.

Давайте посмотрим на класс производителя:

public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;

public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}

Наш конструктор производителя принимает в качестве аргумента BlockingQueue , который используется для координации обработки между производителем и потребителем. Мы видим, что метод generateNumbers() поместит в очередь 100 элементов. Требуется также сообщение о ядовитых пилюлях, чтобы знать, какой тип сообщения должен быть помещен в очередь, когда выполнение будет завершено. Это сообщение должно быть помещено в очередь отравляющих PillPerProducer раз.

Каждый потребитель будет брать элемент из BlockingQueue с помощью метода take() , поэтому он будет блокироваться до тех пор, пока в очереди не появится элемент. После взятия целого числа из очереди он проверяет, является ли сообщение ядовитой пилюлей, и если да, то выполнение потока завершается. В противном случае он выведет результат на стандартный вывод вместе с именем текущего потока.

Это даст нам представление о внутренней работе наших потребителей:

public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;

public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Важно отметить использование очереди. Как и в конструкторе производителя, в качестве аргумента передается очередь. Мы можем это сделать, потому что BlockingQueue может быть разделена между потоками без какой-либо явной синхронизации.

Теперь, когда у нас есть производитель и потребитель, мы можем начать нашу программу. Нам нужно определить емкость очереди, и мы устанавливаем ее на 100 элементов.

Мы хотим иметь 4 потока-производителя, а количество потоков-потребителей будет равно количеству доступных процессоров:

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

BlockingQueue создается с помощью конструкции с емкостью. Мы создаем 4 производителя и N потребителей. Мы указываем, что наше сообщение с ядовитой пилюлей должно быть Integer.MAX_VALUE , потому что такое значение никогда не будет отправлено нашим производителем при нормальных рабочих условиях. Здесь важно отметить, что BlockingQueue используется для координации работы между ними.

Когда мы запускаем программу, 4 потока производителя будут помещать случайные целые числа в BlockingQueue , а потребители будут брать эти элементы из очереди. Каждый поток выводит на стандартный вывод имя потока вместе с результатом.

5. Вывод

В этой статье показано практическое использование BlockingQueue и объясняются методы, используемые для добавления и извлечения из него элементов. Кроме того, мы показали, как создать многопоточную программу производитель-потребитель, используя BlockingQueue для координации работы между производителями и потребителями.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект на основе Maven, поэтому его должно быть легко импортировать и запускать как есть.