1. Обзор
В этом уроке мы рассмотрим концепцию кражи работы в Java .
2. Что такое кража работы?
Кража работы была введена в Java с целью уменьшения конкуренции в многопоточных приложениях . Это делается с помощью фреймворка fork/join .
2.1. Разделяй и властвуй подход
В структуре fork/join проблемы или задачи рекурсивно разбиваются на подзадачи . Затем подзадачи решаются индивидуально, а подрезультаты объединяются для формирования результата:
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
2.2. Рабочие потоки
Сломанная задача решается с помощью рабочих потоков, предоставляемых пулом потоков . Каждый рабочий поток будет иметь подзадачи, за которые он отвечает. Они хранятся в двусторонних очередях ( deques ).
Каждый рабочий поток получает подзадачи из своей двухсторонней очереди, постоянно выталкивая подзадачу из вершины двухсторонней очереди. Когда очередь рабочего потока пуста, это означает, что все подзадачи были сняты и выполнены.
В этот момент рабочий поток случайным образом выбирает одноранговый поток из пула потоков, у которого он может «украсть» работу. Затем он использует подход «первым поступил — первым обслужен» (FIFO) для получения подзадач из хвостовой части дека жертвы.
3. Реализация фреймворка Fork/Join
Мы можем создать пул потоков для кражи работы, используя либо класс ForkJoinPool
, либо класс Executors
:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();
Класс Executors
имеет перегруженный метод newWorkStealingPool
, который принимает целочисленный аргумент, представляющий уровень параллелизма .
Executors.newWorkStealingPool
— это абстракция ForkJoinPool.commonPool
. Единственное отличие состоит в том, что Executors.newWorkStealingPool
создает пул в асинхронном режиме, а ForkJoinPool.commonPool
— нет.
4. Синхронные и асинхронные пулы потоков
ForkJoinPool.commonPool
использует конфигурацию очереди «последним пришел — первым обслужен» (LIFO), тогда как Executors.newWorkStealingPool
использует подход «первым пришел — первым обслужен» (FIFO).
По словам Дуга Ли , подход FIFO имеет следующие преимущества перед LIFO:
- Это уменьшает конкуренцию за счет того, что стилеры действуют на противоположной стороне деки, чем владельцы.
- Он использует свойство рекурсивных алгоритмов «разделяй и властвуй» раннего создания «больших» задач.
Второй пункт выше означает, что можно дополнительно разбить более старую украденную задачу потоком, который ее украл.
Согласно документации по Java , установка для asyncMode значения
true
может подходить для использования с задачами в стиле событий, которые никогда не объединяются.
5. Рабочий пример — поиск простых чисел
Мы воспользуемся примером нахождения простых чисел из набора чисел, чтобы продемонстрировать преимущества времени вычислений при использовании фреймворка для кражи работы . Мы также покажем различия между использованием синхронных и асинхронных пулов потоков.
5.1. Проблема простых чисел
Поиск простых чисел из набора чисел может быть вычислительно затратным процессом. В основном это связано с размером набора чисел.
Класс PrimeNumbers
помогает нам находить простые числа:
public class PrimeNumbers extends RecursiveAction {
private int lowerBound;
private int upperBound;
private int granularity;
static final List<Integer> GRANULARITIES
= Arrays.asList(1, 10, 100, 1000, 10000);
private AtomicInteger noOfPrimeNumbers;
PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.granularity = granularity;
this.noOfPrimeNumbers = noOfPrimeNumbers;
}
// other constructors and methods
private List<PrimeNumbers> subTasks() {
List<PrimeNumbers> subTasks = new ArrayList<>();
for (int i = 1; i <= this.upperBound / granularity; i++) {
int upper = i * granularity;
int lower = (upper - granularity) + 1;
subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
}
return subTasks;
}
@Override
protected void compute() {
if (((upperBound + 1) - lowerBound) > granularity) {
ForkJoinTask.invokeAll(subTasks());
} else {
findPrimeNumbers();
}
}
void findPrimeNumbers() {
for (int num = lowerBound; num <= upperBound; num++) {
if (isPrime(num)) {
noOfPrimeNumbers.getAndIncrement();
}
}
}
public int noOfPrimeNumbers() {
return noOfPrimeNumbers.intValue();
}
}
Несколько важных замечаний об этом классе:
- Он расширяет
RecursiveAction
, что позволяет нам реализовать методвычисления
, используемый в вычислительных задачах с использованием пула потоков. - Он рекурсивно разбивает задачи на подзадачи на основе значения
детализации .
- Конструкторы принимают
нижние
иверхние
граничные значения, которые контролируют диапазон чисел, для которых мы хотим определить простые числа. - Это позволяет нам определять простые числа, используя либо пул потоков, перехватывающих работу, либо один поток.
5.2. Ускорение решения проблемы с помощью пулов потоков
Давайте определим простые числа в однопоточном режиме, а также с использованием пулов рабочих потоков.
Во-первых, давайте посмотрим на однопоточный подход :
PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();
А теперь подход ForkJoinPool.commonPool
:
PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();
Наконец, мы рассмотрим подход Executors.newWorkStealingPool
:
PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();
Мы используем метод вызова класса
ForkJoinPool
для передачи задач в пул потоков. Этот метод принимает экземпляры подклассов RecursiveAction
. Используя Java Microbench Harness , мы сравниваем эти разные подходы друг с другом с точки зрения среднего времени на операцию:
# Run complete. Total time: 00:04:50
Benchmark Mode Cnt Score Error Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark avgt 20 119.885 ± 9.917 ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark avgt 20 119.791 ± 7.811 ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread avgt 20 475.964 ± 7.929 ms/op
Понятно, что и ForkJoinPool.commonPool
, и Executors.newWorkStealingPool
позволяют определять простые числа быстрее, чем при однопоточном подходе.
Структура пула fork/join позволяет нам разбить задачу на подзадачи. Мы разбили набор из 10 000 целых чисел на пакеты 1-100, 101-200, 201-300 и так далее. Затем мы определили простые числа для каждой партии и сделали общее количество простых чисел доступным с помощью нашего метода noOfPrimeNumbers
.
5.3. Кража работы для вычислений
С синхронным пулом потоков ForkJoinPool.commonPool
помещает потоки в пул, пока задача все еще выполняется. В результате уровень кражи работы не зависит от уровня детализации задачи.
Асинхронный Executors.newWorkStealingPool
`` является более управляемым, что позволяет зависеть от уровня детализации задач.
Мы получаем уровень кражи работы с помощью getStealCount
класса ForkJoinPool
:
long steals = forkJoinPool.getStealCount();
Определение количества кражи работы для Executors.newWorkStealingPool
и ForkJoinPool.commonPool
дает нам разное поведение:
Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]
ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]
При изменении степени детализации с мелкой на грубую (от 1 до 10 000) для Executors.newWorkStealingPool
уровень кражи работы снижается . Поэтому счетчик кражи равен единице, когда задача не разбита (детализация 10 000).
ForkJoinPool.commonPool ведет
себя иначе. Уровень кражи работы всегда высок и не сильно зависит от изменения детализации задач.
С технической точки зрения, наш пример с простыми числами поддерживает асинхронную обработку событийных задач. Это связано с тем, что наша реализация не требует объединения результатов.
Можно утверждать, что Executors.newWorkStealingPool
предлагает наилучшее использование ресурсов для решения проблемы.
6. Заключение
В этой статье мы рассмотрели воровство работы и то, как его применять с помощью фреймворка fork/join. Мы также рассмотрели примеры кражи работы и то, как это может улучшить время обработки и использование ресурсов.
Как всегда, полный исходный код примера доступен на GitHub .