1. Обзор
В этой статье мы рассмотрим конструкцию DelayQueue
из пакета java.util.concurrent
. Это блокирующая очередь, которую можно использовать в программах производитель-потребитель.
У него есть очень полезная характеристика — когда потребитель хочет взять элемент из очереди, он может взять его только тогда, когда истечет задержка для этого конкретного элемента.
2. Реализация Delayed
для элементов в DelayQueue
Каждый элемент, который мы хотим поместить в DelayQueue
, должен реализовать интерфейс Delayed .
Допустим, мы хотим создать класс DelayObject
. Экземпляры этого класса будут помещены в DelayQueue.
Мы передадим данные String
и delayInMilliseconds
как и аргументы его конструктору:
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
Мы определяем startTime —
это время, когда элемент должен быть использован из очереди. Далее нам нужно реализовать метод getDelay()
— он должен возвращать оставшуюся задержку, связанную с этим объектом, в заданную единицу времени.
Поэтому нам нужно использовать метод TimeUnit.convert()
, чтобы вернуть оставшуюся задержку в правильном TimeUnit:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
Когда потребитель пытается взять элемент из очереди, DelayQueue
выполнит getDelay()
, чтобы узнать, разрешено ли возвращать этот элемент из очереди. Если метод getDelay()
вернет ноль или отрицательное число, это означает, что его можно извлечь из очереди.
Нам также необходимо реализовать метод compareTo()
, потому что элементы в DelayQueue
будут отсортированы по времени истечения срока действия. Элемент, срок действия которого истекает первым, остается в начале очереди, а элемент с наибольшим сроком действия — в хвосте очереди:
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3. DelayQueue
потребитель и производитель
Чтобы иметь возможность протестировать нашу DelayQueue
, нам нужно реализовать логику производителя и потребителя. Класс производителя принимает в качестве аргументов очередь, количество создаваемых элементов и задержку каждого сообщения в миллисекундах.
Затем, когда вызывается метод run() , он помещает элементы в очередь и приостанавливается на 500 миллисекунд после каждого помещения:
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
Реализация потребителя очень похожа, но она также отслеживает количество потребленных сообщений:
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4. Тест использования DelayQueue
Чтобы проверить поведение DelayQueue,
мы создадим один поток производителя и один поток потребителя.
Производитель поместит
два объекта в очередь с задержкой 500 миллисекунд. Тест утверждает, что потребитель получил два сообщения:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
Мы можем заметить, что запуск этой программы приведет к следующему выводу:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Производитель ставит объект, и через некоторое время потребляется первый объект, для которого истекла задержка.
Такая же ситуация произошла и со вторым элементом.
5. Потребитель не может потреблять в данное время
Допустим, у нас есть производитель, который производит элемент, срок действия которого истечет через 10 секунд :
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
Мы начнем наш тест, но он завершится через 5 секунд. Из-за характеристик DelayQueue
потребитель не сможет использовать сообщение из очереди, поскольку срок действия элемента еще не истек:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
Обратите внимание, что numberOfConsumedElements
потребителя имеет значение, равное нулю.
6. Создание элемента с немедленным истечением срока действия
Когда реализации метода getDelay
()
сообщения с задержкой возвращают отрицательное число, это означает, что срок действия данного элемента уже истек. В этой ситуации производитель немедленно потребляет этот элемент.
Мы можем протестировать ситуацию создания элемента с отрицательной задержкой:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
Когда мы запускаем тестовый пример, потребитель немедленно потребляет элемент, потому что срок его действия уже истек:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. Заключение
В этой статье мы рассмотрели конструкцию DelayQueue
из пакета java.util.concurrent
.
Мы реализовали элемент Delayed
, который создавался и потреблялся из очереди.
Мы использовали нашу реализацию DelayQueue
для использования просроченных элементов.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub, который является проектом Maven, поэтому его должно быть легко импортировать и запускать как есть.