Перейти к основному содержимому

Руководство по PriorityBlockingQueue в Java

· 4 мин. чтения

1. Введение

В этой статье мы сосредоточимся на классе PriorityBlockingQueue и рассмотрим несколько практических примеров.

Исходя из предположения, что мы уже знаем, что такое очередь , мы сначала продемонстрируем , как элементы в PriorityBlockingQueue упорядочены по приоритету .

После этого мы продемонстрируем, как этот тип очереди можно использовать для блокировки потока.

Наконец, мы покажем, как совместное использование этих двух функций может быть полезным при обработке данных в нескольких потоках.

2. Приоритет элементов

В отличие от стандартной очереди, вы не можете просто добавить элемент любого типа в PriorityBlockingQueue. Есть два варианта:

  1. Добавление элементов, реализующих Comparable
  2. Добавление элементов, которые не реализуют Comparable , при условии, что вы также предоставляете Comparator

При использовании реализации Comparator или Comparable для сравнения элементов PriorityBlockingQueue всегда будет сортироваться.

Цель состоит в том, чтобы реализовать логику сравнения таким образом, чтобы элемент с наивысшим приоритетом всегда упорядочивался первым . Затем, когда мы удаляем элемент из нашей очереди, он всегда будет иметь наивысший приоритет.

Для начала давайте воспользуемся нашей очередью в одном потоке, а не в нескольких. Это позволяет легко доказать, как элементы упорядочены в модульном тесте:

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();

queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);

queue.drainTo(polledElements);

assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

Как мы видим, несмотря на добавление элементов в очередь в случайном порядке, они будут упорядочены, когда мы начнем их опрашивать. Это связано с тем, что класс Integer реализует Comparable, который, в свою очередь, будет использоваться, чтобы убедиться, что мы извлекаем их из очереди в порядке возрастания.

Также стоит отметить, что когда два элемента сравниваются и оказываются одинаковыми, нет никакой гарантии, как они будут упорядочены.

3. Использование очереди для блокировки

Если бы мы имели дело со стандартной очередью, мы бы вызвали poll() для извлечения элементов. Однако, если очередь пуста, вызов poll() вернет null.

PriorityBlockingQueue реализует интерфейс BlockingQueue , который дает нам несколько дополнительных методов, позволяющих блокироваться при удалении из пустой очереди . Давайте попробуем использовать метод take() , который должен делать именно это:

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

new Thread(() -> {
System.out.println("Polling...");

try {
Integer poll = queue.take();
System.out.println("Polled: " + poll);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);

Хотя использование sleep() — немного ненадежный способ демонстрации вещей, когда мы запустим этот код, мы увидим:

Polling...
Adding to queue
Polled: 1

Это доказывает, что функция take() заблокирована до тех пор, пока элемент не будет добавлен:

  1. Поток напечатает «Опрос», чтобы доказать, что он запущен.
  2. Затем тест приостанавливается примерно на пять секунд, чтобы доказать, что к этому моменту поток должен вызвать функцию take() .
  3. Мы добавляем в очередь и должны более или менее мгновенно увидеть «Опрошено: 1», чтобы доказать, что функция take() вернула элемент, как только он стал доступен.

Также стоит упомянуть, что интерфейс BlockingQueue также предоставляет нам способы блокировки при добавлении в полные очереди.

Однако PriorityBlockingQueue не имеет ограничений. Это означает, что он никогда не будет заполнен, поэтому всегда можно будет добавить новые элементы.

4. Использование блокировки и расстановки приоритетов вместе

Теперь, когда мы объяснили два ключевых понятия PriorityBlockingQueue, давайте воспользуемся ими обоими вместе. Мы можем просто расширить наш предыдущий пример, но на этот раз добавить в очередь больше элементов:

Thread thread = new Thread(() -> {
System.out.println("Polling...");
while (true) {
try {
Integer poll = queue.take();
System.out.println("Polled: " + poll);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
});

thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");

queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));

Опять же, хотя это немного хрупко из-за использования sleep(), оно все же показывает нам допустимый вариант использования. Теперь у нас есть очередь, которая блокируется, ожидая добавления элементов. Затем мы добавляем множество элементов одновременно, а затем показываем, что они будут обрабатываться в порядке приоритета. Вывод будет выглядеть следующим образом:

Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7

5. Вывод

В этом руководстве мы продемонстрировали, как мы можем использовать PriorityBlockingQueue для блокировки потока до тех пор, пока в него не будут добавлены некоторые элементы, а также то, что мы можем обрабатывать эти элементы в зависимости от их приоритета.

Реализацию этих примеров можно найти на GitHub . Это проект на основе Maven, поэтому его легко запустить как есть.