1. Обзор
Executor Framework в Java — это попытка отделить отправку задачи от выполнения задачи. Хотя этот подход очень хорошо абстрагирует детали выполнения задачи, иногда нам все же нужно настроить его для еще более оптимального выполнения.
В этом руководстве мы увидим, что происходит, когда пул потоков больше не может принимать задачи. Затем мы узнаем, как управлять этим краеугольным камнем, применяя соответствующие политики насыщения.
2. Пересмотр пулов потоков
На следующей диаграмме показано, как служба исполнителя работает внутри:
Вот что происходит, когда мы отправляем новую задачу исполнителю :
- Если один из потоков доступен, он обрабатывает задачу.
- В противном случае исполнитель добавляет новую задачу в свою очередь.
- Когда поток завершает текущую задачу, он берет другую из очереди.
2.1. ThreadPoolExecutor
_ ``
Большинство реализаций исполнителей используют хорошо известный ThreadPoolExecutor
в качестве базовой реализации. Поэтому, чтобы лучше понять, как работает очередь задач, нам следует более подробно рассмотреть ее конструктор:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
)
2.2. Размер основного пула
Параметр corePoolSize
определяет начальный размер пула потоков. Обычно исполнитель следит за тем, чтобы пул потоков содержал по крайней мере число потоков corePoolSize
.
Однако количество потоков может быть меньше, если мы включим параметр allowCoreThreadTimeOut
.
2.3. Максимальный размер пула
Предположим, что все основные потоки заняты выполнением нескольких задач. В результате исполнитель ставит новые задачи в очередь до тех пор, пока они не получат возможность быть обработанными позже.
Когда эта очередь заполняется, исполнитель может добавить больше потоков в пул потоков. MaximumPoolSize устанавливает
верхнюю границу числа потоков, которые потенциально может содержать пул потоков.
Когда эти потоки остаются бездействующими в течение некоторого времени, исполнитель может удалить их из пула. Следовательно, размер пула может уменьшиться до размера ядра.
2.4. очередь
Как мы видели ранее, когда все основные потоки заняты, исполнитель добавляет новые задачи в очередь. Существует три разных подхода к организации очереди :
Неограниченная очередь
: очередь может содержать неограниченное количество задач. Поскольку эта очередь никогда не заполняется, исполнитель игнорирует максимальный размер. Этот подход используют исполнители фиксированного размера и однопотоковые исполнители .Ограниченная очередь
: как следует из названия, очередь может содержать только ограниченное количество задач. В результате пул потоков будет увеличиваться при заполнении ограниченной очереди.Синхронная передача обслуживания
: как ни странно, эта очередь не может содержать никаких задач! При таком подходе мы можем поставить задачу в очередь тогда и только тогда, когда другой поток выбирает ту же задачу на другой стороне в то же время . Исполнитель кэшированного пула потоков использует этот подход внутри.
Предположим следующий сценарий, когда мы используем ограниченную организацию очередей или синхронную передачу обслуживания:
- Все основные потоки заняты
- Внутренняя очередь заполняется
- Пул потоков увеличивается до максимально возможного размера, и все эти потоки также заняты.
Что происходит, когда появляется новая задача?
3. Политика насыщения
Когда все потоки заняты, а внутренняя очередь заполняется, исполнитель становится насыщенным.
Исполнители могут выполнять предопределенные действия, как только они достигают насыщения. Эти действия известны как политики насыщения. Мы можем изменить политику насыщения исполнителя, передав экземпляр RejectedExecutionHandler
его конструктору.
К счастью, Java предоставляет несколько встроенных реализаций для этого класса, каждая из которых охватывает определенный вариант использования. В следующих разделах мы подробно оценим эти политики.
3.1. Политика отмены
Политикой по умолчанию является политика прерывания . Политика отмены заставляет исполнителя генерировать исключение RejectedExecutionException
:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> waitFor(250));
assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
.isInstanceOf(RejectedExecutionException.class);
Поскольку выполнение первой задачи занимает много времени, исполнитель отклоняет вторую задачу.
3.2. Политика вызывающего абонента
Вместо асинхронного запуска задачи в другом потоке эта политика заставляет вызывающий поток выполнять задачу :
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(() -> waitFor(250));
long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;
assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
После отправки первой задачи исполнитель больше не может принимать новые задачи. Поэтому вызывающий поток блокируется до тех пор, пока не вернется вторая задача.
Политика caller-runs позволяет легко реализовать простую форму регулирования . То есть медленный потребитель может замедлить быстрого производителя, чтобы контролировать поток отправки задач.
3.3. Отменить политику
Политика отмены автоматически отбрасывает новую задачу, если она не может ее отправить :
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
Здесь вторая задача публикует простое сообщение в очередь. Так как у него никогда не будет шанса выполниться, очередь остается пустой, даже несмотря на то, что мы блокируем ее в течение некоторого времени.
3.4. Отменить старую политику
Политика discard-oldest сначала удаляет задачу из головы очереди, а затем повторно отправляет новую задачу :
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("Second", "Third");
На этот раз мы используем ограниченную очередь, которая может содержать только две задачи. Вот что происходит, когда мы отправляем эти четыре задачи:
- Первые задачи загружают один поток на 100 миллисекунд.
- Исполнитель успешно ставит в очередь вторую и третью задачи
- Когда приходит четвертая задача, политика discard-oldest удаляет самую старую задачу, чтобы освободить место для этой новой.
Политика discard-oldest и приоритетные очереди плохо сочетаются друг с другом. Поскольку глава приоритетной очереди имеет наивысший приоритет, мы можем просто потерять самую важную задачу .
3.5. Пользовательская политика
Также можно предоставить пользовательскую политику насыщения, просто реализовав интерфейс RejectedExecutionHandler :
class GrowPolicy implements RejectedExecutionHandler {
private final Lock lock = new ReentrantLock();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
lock.lock();
try {
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
} finally {
lock.unlock();
}
executor.submit(r);
}
}
В этом примере, когда исполнитель становится насыщенным, мы увеличиваем максимальный размер пула на единицу, а затем повторно отправляем ту же задачу:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new GrowPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).contains("First", "Second", "Third");
Как и ожидалось, все четыре задачи выполнены.
3.6. Неисправность
Политики насыщения применяются не только к перегруженным исполнителям, но и ко всем выключенным исполнителям :
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
То же самое верно для всех исполнителей, находящихся в процессе остановки:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
4. Вывод
В этом руководстве, во-первых, мы довольно быстро вспомнили о пулах потоков в Java. Затем, после введения насыщенных исполнителей, мы узнали, как и когда применять различные политики насыщения.
Как обычно, пример кода доступен на GitHub .