1. Обзор
В этой статье мы рассмотрим конструкцию TransferQueue
из стандартного пакета java.util.concurrent
.
Проще говоря, эта очередь позволяет нам создавать программы в соответствии с шаблоном производитель-потребитель и координировать передачу сообщений от производителей к потребителям.
Реализация на самом деле похожа на BlockingQueue ,
но дает нам новую возможность реализовать форму обратного давления. Это означает, что когда производитель отправляет сообщение потребителю с помощью метода transfer()
, производитель будет оставаться заблокированным до тех пор, пока сообщение не будет использовано.
2. Один производитель — ноль потребителей
Давайте протестируем метод transfer()
из TransferQueue
— ожидаемое поведение заключается в том, что производитель будет заблокирован до тех пор, пока потребитель не получит сообщение из очереди с помощью метода take()
.
Для этого мы создадим программу с одним производителем, но без потребителей. Первый вызов transfer()
из потока производителя будет заблокирован на неопределенный срок, так как у нас нет потребителей для извлечения этого элемента из очереди.
Давайте посмотрим, как выглядит класс Producer :
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer numberOfMessagesToProduce;
public AtomicInteger numberOfProducedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
boolean added
= transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if(added){
numberOfProducedMessages.incrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// standard constructors
}
Мы передаем экземпляр TransferQueue
в конструктор вместе с именем, которое мы хотим дать нашему производителю, и количеством элементов, которые должны быть переданы в очередь.
Обратите внимание, что мы используем метод tryTransfer()
с заданным временем ожидания. Мы ждем четыре секунды, и если производитель не может передать сообщение в течение заданного таймаута, он возвращает false
и переходит к следующему сообщению. Производитель имеет переменную numberOfProducedMessages
для отслеживания количества созданных сообщений.
Далее, давайте посмотрим на класс Consumer :
class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int numberOfMessagesToConsume;
public AtomicInteger numberOfConsumedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
String element = transferQueue.take();
longProcessing(element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element)
throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
// standard constructors
}
Он похож на производителя, но мы получаем элементы из очереди с помощью метода take()
. Мы также моделируем некоторые длительные действия, используя метод longProcessing()
, в котором мы увеличиваем переменную numberOfConsumedMessages
, которая является счетчиком полученных сообщений.
Теперь давайте начнем нашу программу только с одним производителем:
@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
// when
exService.execute(producer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}
Мы хотим отправить в очередь три элемента, но производитель заблокирован для первого элемента, и нет ни одного потребителя, который мог бы получить этот элемент из очереди .
Мы используем метод tryTransfer ()
` , который будет блокироваться до тех пор, пока сообщение не будет использовано или не истечет время ожидания. По истечении времени ожидания он вернет
false` , чтобы указать, что передача не удалась, и попытается передать следующую. Это вывод из предыдущего примера:
Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...
3. Один производитель – один потребитель
Протестируем ситуацию, когда есть один производитель и один потребитель:
@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
Consumer consumer = new Consumer(transferQueue, "1", 3);
// when
exService.execute(producer);
exService.execute(consumer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}
TransferQueue используется как точка
обмена, и пока потребитель не использует элемент из очереди, производитель не может продолжить добавление в нее другого элемента. Посмотрим на вывод программы:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2
Мы видим, что создание и потребление элементов из очереди происходит последовательно из-за спецификации TransferQueue.
4. Много производителей — много потребителей
В последнем примере мы рассмотрим наличие нескольких потребителей и нескольких производителей:
@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(3);
Producer producer1 = new Producer(transferQueue, "1", 3);
Producer producer2 = new Producer(transferQueue, "2", 3);
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
// when
exService.execute(producer1);
exService.execute(producer2);
exService.execute(consumer1);
exService.execute(consumer2);
// then
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}
В этом примере у нас есть два потребителя и два производителя. При запуске программы мы видим, что оба производителя могут произвести один элемент и после этого они будут блокироваться до тех пор, пока один из потребителей не возьмет этот элемент из очереди:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2
5. Вывод
В этой статье мы рассмотрели конструкцию TransferQueue
из пакета java.util.concurrent
.
Мы увидели, как реализовать программу производитель-потребитель, используя эту конструкцию. Мы использовали метод transfer()
для создания формы обратного давления, когда производитель не может опубликовать другой элемент, пока потребитель не извлечет элемент из очереди.
TransferQueue может быть очень полезен, когда нам не нужен перепроизводящий производитель, который заполнит очередь сообщениями, что приведет к ошибкам
OutOfMemory
. В таком дизайне потребитель будет диктовать скорость, с которой производитель будет создавать сообщения.
Все эти примеры и фрагменты кода можно найти на GitHub — это проект Maven, поэтому его легко импортировать и запускать как есть.