Перейти к основному содержимому

Как планировать задания с учетом приоритетов в Java?

· 3 мин. чтения

1. Введение

В многопоточной среде иногда нам нужно планировать задачи на основе настраиваемых критериев, а не только времени создания.

Давайте посмотрим, как мы можем добиться этого в Java — используя PriorityBlockingQueue .

2. Обзор

Допустим, у нас есть задания, которые мы хотим выполнить в зависимости от их приоритета:

public class Job implements Runnable {
private String jobName;
private JobPriority jobPriority;

@Override
public void run() {
System.out.println("Job:" + jobName +
" Priority:" + jobPriority);
Thread.sleep(1000); // to simulate actual execution time
}

// standard setters and getters
}

В демонстрационных целях мы печатаем имя и приоритет задания в методе run() .

Мы также добавили функцию sleep() , чтобы имитировать более длительную работу; пока задание выполняется, в приоритетной очереди накапливается больше заданий.

Наконец, JobPriority — это простое перечисление:

public enum JobPriority {
HIGH,
MEDIUM,
LOW
}

3. Пользовательский компаратор

Нам нужно написать компаратор, определяющий наши пользовательские критерии; а в Java 8 это тривиально :

Comparator.comparing(Job::getJobPriority);

4. Планировщик приоритетных заданий

Выполнив все настройки, давайте теперь реализуем простой планировщик заданий, который использует один исполнитель потока для поиска заданий в PriorityBlockingQueue и их выполнения:

public class PriorityJobScheduler {

private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler
= Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;

public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(
queueSize,
Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() -> {
while (true) {
try {
priorityJobPoolExecutor.execute(priorityQueue.take());
} catch (InterruptedException e) {
// exception needs special handling
break;
}
}
});
}

public void scheduleJob(Job job) {
priorityQueue.add(job);
}
}

Ключевым моментом здесь является создание экземпляра PriorityBlockingQueue типа Job с пользовательским компаратором. Следующее задание для выполнения выбирается из очереди с помощью метода take() , который извлекает и удаляет заголовок очереди.

Клиентскому коду теперь достаточно вызвать функцию scheduleJob() , которая добавляет задание в очередь. PriorityQueue.add () помещает задание в очередь в соответствующей позиции по сравнению с существующими заданиями в очереди, используя JobExecutionComparator .

Обратите внимание, что фактические задания выполняются с использованием отдельного ExecutorService с выделенным пулом потоков.

5. Демо

Наконец, вот краткая демонстрация планировщика:

private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
Job job1 = new Job("Job1", JobPriority.LOW);
Job job2 = new Job("Job2", JobPriority.MEDIUM);
Job job3 = new Job("Job3", JobPriority.HIGH);
Job job4 = new Job("Job4", JobPriority.MEDIUM);
Job job5 = new Job("Job5", JobPriority.LOW);
Job job6 = new Job("Job6", JobPriority.HIGH);

PriorityJobScheduler pjs = new PriorityJobScheduler(
POOL_SIZE, QUEUE_SIZE);

pjs.scheduleJob(job1);
pjs.scheduleJob(job2);
pjs.scheduleJob(job3);
pjs.scheduleJob(job4);
pjs.scheduleJob(job5);
pjs.scheduleJob(job6);

// clean up
}

Чтобы продемонстрировать, что задания выполняются в порядке приоритета, мы сохранили POOL_SIZE равным 1, хотя QUEUE_SIZE равно 10. Мы предоставляем планировщику задания с различным приоритетом.

Вот пример вывода, который мы получили для одного из прогонов:

Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW

Результат может варьироваться в зависимости от прогона. Однако у нас никогда не должно быть случая, когда задание с более низким приоритетом выполняется, даже если очередь содержит задание с более высоким приоритетом.

6. Заключение

В этом кратком руководстве мы увидели, как PriorityBlockingQueue можно использовать для выполнения заданий в пользовательском порядке приоритета.

Как обычно, исходные файлы можно найти на GitHub .