Перейти к основному содержимому

Руководство по краже работы в Java

· 6 мин. чтения

1. Обзор

В этом уроке мы рассмотрим концепцию кражи работы в Java .

2. Что такое кража работы?

Кража работы была введена в Java с целью уменьшения конкуренции в многопоточных приложениях . Это делается с помощью фреймворка fork/join .

2.1. Разделяй и властвуй подход

В структуре fork/join проблемы или задачи рекурсивно разбиваются на подзадачи . Затем подзадачи решаются индивидуально, а подрезультаты объединяются для формирования результата:

Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}

2.2. Рабочие потоки

Сломанная задача решается с помощью рабочих потоков, предоставляемых пулом потоков . Каждый рабочий поток будет иметь подзадачи, за которые он отвечает. Они хранятся в двусторонних очередях ( deques ).

Каждый рабочий поток получает подзадачи из своей двухсторонней очереди, постоянно выталкивая подзадачу из вершины двухсторонней очереди. Когда очередь рабочего потока пуста, это означает, что все подзадачи были сняты и выполнены.

В этот момент рабочий поток случайным образом выбирает одноранговый поток из пула потоков, у которого он может «украсть» работу. Затем он использует подход «первым поступил — первым обслужен» (FIFO) для получения подзадач из хвостовой части дека жертвы.

3. Реализация фреймворка Fork/Join

Мы можем создать пул потоков для кражи работы, используя либо класс ForkJoinPool , либо класс Executors :

ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();

Класс Executors имеет перегруженный метод newWorkStealingPool , который принимает целочисленный аргумент, представляющий уровень параллелизма .

Executors.newWorkStealingPool — это абстракция ForkJoinPool.commonPool . Единственное отличие состоит в том, что Executors.newWorkStealingPool создает пул в асинхронном режиме, а ForkJoinPool.commonPool — нет.

4. Синхронные и асинхронные пулы потоков

ForkJoinPool.commonPool использует конфигурацию очереди «последним пришел — первым обслужен» (LIFO), тогда как Executors.newWorkStealingPool использует подход «первым пришел — первым обслужен» (FIFO).

По словам Дуга Ли , подход FIFO имеет следующие преимущества перед LIFO:

  • Это уменьшает конкуренцию за счет того, что стилеры действуют на противоположной стороне деки, чем владельцы.
  • Он использует свойство рекурсивных алгоритмов «разделяй и властвуй» раннего создания «больших» задач.

Второй пункт выше означает, что можно дополнительно разбить более старую украденную задачу потоком, который ее украл.

Согласно документации по Java , установка для asyncMode значения true может подходить для использования с задачами в стиле событий, которые никогда не объединяются.

5. Рабочий пример — поиск простых чисел

Мы воспользуемся примером нахождения простых чисел из набора чисел, чтобы продемонстрировать преимущества времени вычислений при использовании фреймворка для кражи работы . Мы также покажем различия между использованием синхронных и асинхронных пулов потоков.

5.1. Проблема простых чисел

Поиск простых чисел из набора чисел может быть вычислительно затратным процессом. В основном это связано с размером набора чисел.

Класс PrimeNumbers помогает нам находить простые числа:

public class PrimeNumbers extends RecursiveAction {

private int lowerBound;
private int upperBound;
private int granularity;
static final List<Integer> GRANULARITIES
= Arrays.asList(1, 10, 100, 1000, 10000);
private AtomicInteger noOfPrimeNumbers;

PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.granularity = granularity;
this.noOfPrimeNumbers = noOfPrimeNumbers;
}

// other constructors and methods

private List<PrimeNumbers> subTasks() {
List<PrimeNumbers> subTasks = new ArrayList<>();

for (int i = 1; i <= this.upperBound / granularity; i++) {
int upper = i * granularity;
int lower = (upper - granularity) + 1;
subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
}
return subTasks;
}

@Override
protected void compute() {
if (((upperBound + 1) - lowerBound) > granularity) {
ForkJoinTask.invokeAll(subTasks());
} else {
findPrimeNumbers();
}
}

void findPrimeNumbers() {
for (int num = lowerBound; num <= upperBound; num++) {
if (isPrime(num)) {
noOfPrimeNumbers.getAndIncrement();
}
}
}

public int noOfPrimeNumbers() {
return noOfPrimeNumbers.intValue();
}
}

Несколько важных замечаний об этом классе:

  • Он расширяет RecursiveAction , что позволяет нам реализовать метод вычисления , используемый в вычислительных задачах с использованием пула потоков.
  • Он рекурсивно разбивает задачи на подзадачи на основе значения детализации .
  • Конструкторы принимают нижние и верхние граничные значения, которые контролируют диапазон чисел, для которых мы хотим определить простые числа.
  • Это позволяет нам определять простые числа, используя либо пул потоков, перехватывающих работу, либо один поток.

5.2. Ускорение решения проблемы с помощью пулов потоков

Давайте определим простые числа в однопоточном режиме, а также с использованием пулов рабочих потоков.

Во-первых, давайте посмотрим на однопоточный подход :

PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();

А теперь подход ForkJoinPool.commonPool :

PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();

Наконец, мы рассмотрим подход Executors.newWorkStealingPool :

PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();

Мы используем метод вызова класса ForkJoinPool для передачи задач в пул потоков. Этот метод принимает экземпляры подклассов RecursiveAction . Используя Java Microbench Harness , мы сравниваем эти разные подходы друг с другом с точки зрения среднего времени на операцию:

# Run complete. Total time: 00:04:50

Benchmark Mode Cnt Score Error Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark avgt 20 119.885 ± 9.917 ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark avgt 20 119.791 ± 7.811 ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread avgt 20 475.964 ± 7.929 ms/op

Понятно, что и ForkJoinPool.commonPool , и Executors.newWorkStealingPool позволяют определять простые числа быстрее, чем при однопоточном подходе.

Структура пула fork/join позволяет нам разбить задачу на подзадачи. Мы разбили набор из 10 000 целых чисел на пакеты 1-100, 101-200, 201-300 и так далее. Затем мы определили простые числа для каждой партии и сделали общее количество простых чисел доступным с помощью нашего метода noOfPrimeNumbers .

5.3. Кража работы для вычислений

С синхронным пулом потоков ForkJoinPool.commonPool помещает потоки в пул, пока задача все еще выполняется. В результате уровень кражи работы не зависит от уровня детализации задачи.

Асинхронный Executors.newWorkStealingPool `` является более управляемым, что позволяет зависеть от уровня детализации задач.

Мы получаем уровень кражи работы с помощью getStealCount класса ForkJoinPool :

long steals = forkJoinPool.getStealCount();

Определение количества кражи работы для Executors.newWorkStealingPool и ForkJoinPool.commonPool дает нам разное поведение:

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

При изменении степени детализации с мелкой на грубую (от 1 до 10 000) для Executors.newWorkStealingPool уровень кражи работы снижается . Поэтому счетчик кражи равен единице, когда задача не разбита (детализация 10 000).

ForkJoinPool.commonPool ведет себя иначе. Уровень кражи работы всегда высок и не сильно зависит от изменения детализации задач.

С технической точки зрения, наш пример с простыми числами поддерживает асинхронную обработку событийных задач. Это связано с тем, что наша реализация не требует объединения результатов.

Можно утверждать, что Executors.newWorkStealingPool предлагает наилучшее использование ресурсов для решения проблемы.

6. Заключение

В этой статье мы рассмотрели воровство работы и то, как его применять с помощью фреймворка fork/join. Мы также рассмотрели примеры кражи работы и то, как это может улучшить время обработки и использование ресурсов.

Как всегда, полный исходный код примера доступен на GitHub .