Перейти к основному содержимому

Руководство по платформе Fork/Join в Java

· 6 мин. чтения

1. Обзор

Платформа fork/join была представлена в Java 7. Она предоставляет инструменты, помогающие ускорить параллельную обработку, пытаясь использовать все доступные процессорные ядра, что достигается с помощью подхода «разделяй и властвуй » .

На практике это означает, что фреймворк сначала «разветвляется» , рекурсивно разбивая задачу на более мелкие независимые подзадачи, пока они не станут достаточно простыми для асинхронного выполнения.

После этого начинается часть «объединения» , в которой результаты всех подзадач рекурсивно объединяются в один результат, или в случае задачи, которая возвращает void, программа просто ждет, пока каждая подзадача будет выполнена.

Чтобы обеспечить эффективное параллельное выполнение, платформа fork/join использует пул потоков, называемый ForkJoinPool , который управляет рабочими потоками типа ForkJoinWorkerThread .

2. Вилка присоединения к пулу

ForkJoinPool — это сердце фреймворка. Это реализация ExecutorService , которая управляет рабочими потоками и предоставляет нам инструменты для получения информации о состоянии и производительности пула потоков.

Рабочие потоки могут одновременно выполнять только одну задачу, но ForkJoinPool не создает отдельный поток для каждой отдельной подзадачи. Вместо этого каждый поток в пуле имеет свою собственную двустороннюю очередь (или deque , произносится как колода ), в которой хранятся задачи.

Эта архитектура жизненно важна для балансировки рабочей нагрузки потока с помощью алгоритма кражи работы.

2.1. Алгоритм кражи работы

Проще говоря — свободные потоки пытаются «украсть» работу из деков занятых потоков.

По умолчанию рабочий поток получает задачи от головы своей очереди. Когда он пуст, поток берет задачу из хвоста дека другого занятого потока или из глобальной очереди входа, так как именно там, вероятно, находятся самые большие объемы работы.

Такой подход сводит к минимуму вероятность того, что потоки будут конкурировать за задачи. Это также уменьшает количество раз, когда потоку придется искать работу, поскольку он сначала работает с самыми большими доступными блоками работы.

2.2. `` Создание экземпляра ForkJoinPool ****

В Java 8 самый удобный способ получить доступ к экземпляру ForkJoinPool — использовать его статический метод commonPool (). Как следует из названия, это предоставит ссылку на общий пул, который является пулом потоков по умолчанию для каждого ForkJoinTask .

Согласно документации Oracle , использование предопределенного общего пула снижает потребление ресурсов, поскольку это препятствует созданию отдельного пула потоков для каждой задачи.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Такого же поведения можно добиться в Java 7, создав ForkJoinPool и назначив его общедоступному статическому полю служебного класса:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Теперь к нему легко получить доступ:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

С помощью конструкторов ForkJoinPool можно создать собственный пул потоков с определенным уровнем параллелизма, фабрикой потоков и обработчиком исключений. В приведенном выше примере пул имеет уровень параллелизма 2. Это означает, что пул будет использовать 2 процессорных ядра.

3. ВилкаJoinTask<V>

ForkJoinTask — это базовый тип для задач, выполняемых внутри ForkJoinPool. На практике следует расширить один из двух его подклассов: RecursiveAction для пустых задач и RecursiveTask<V> для задач, возвращающих значение. `У них обоих есть абстрактный метод calculate()` , в котором определяется логика задачи.

3.1. Рекурсивное действие — пример

В приведенном ниже примере обрабатываемая единица работы представлена строкой с именем workload . В демонстрационных целях эта задача не имеет смысла: она просто записывает введенные данные в верхний регистр и регистрирует их.

Чтобы продемонстрировать поведение ветвления платформы, пример разбивает задачу, если рабочая нагрузка .length() превышает указанный порог ` , используя метод createSubtask()` .

Строка рекурсивно делится на подстроки, создавая экземпляры CustomRecursiveTask , основанные на этих подстроках.

В результате метод возвращает List<CustomRecursiveAction>.

Список отправляется в ForkJoinPool с помощью метода invokeAll() :

public class CustomRecursiveAction extends RecursiveAction {

private String workload = "";
private static final int THRESHOLD = 4;

private static Logger logger =
Logger.getAnonymousLogger();

public CustomRecursiveAction(String workload) {
this.workload = workload;
}

@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}

private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();

String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());

subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));

return subtasks;
}

private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}

Этот шаблон можно использовать для разработки собственных классов RecursiveAction . Для этого создайте объект, представляющий общий объем работы, выберите подходящий порог, определите метод разделения работы и определите метод выполнения работы.

3.2. Рекурсивная задача<V>

Для задач, возвращающих значение, логика здесь аналогична, за исключением того, что результат для каждой подзадачи объединяется в один результат:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;

private static final int THRESHOLD = 20;

public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}

@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}

private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}

private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}

В этом примере работа представлена массивом, хранящимся в поле arr класса CustomRecursiveTask . Метод createSubtasks() рекурсивно делит задачу на более мелкие части работы, пока каждая часть не станет меньше порогового значения . Затем метод invokeAll() отправляет подзадачи в общий пул и возвращает список Future .

Чтобы инициировать выполнение, для каждой подзадачи вызывается метод join() .

В этом примере это достигается с помощью Stream API Java 8 ; метод sum() используется как представление объединения подрезультатов в окончательный результат.

4. Отправка задач в ForkJoinPool

Для отправки задач в пул потоков можно использовать несколько подходов.

Метод submit() или execute() (варианты их использования одинаковы):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

Метод invoke() разветвляет задачу и ожидает результата и не требует ручного присоединения:

int result = forkJoinPool.invoke(customRecursiveTask);

Метод invokeAll() является наиболее удобным способом отправки последовательности задач ForkJoinTasks в пул ForkJoinPool. Он принимает задачи в качестве параметров (две задачи, var args или коллекцию), разветвляется, а затем возвращает коллекцию объектов Future в том порядке, в котором они были созданы.

В качестве альтернативы вы можете использовать отдельные методы fork() и join() . Метод fork() отправляет задачу в пул, но не запускает ее выполнение. Для этого необходимо использовать метод join() . В случае RecursiveAction join() не возвращает ничего , кроме null ; для RecursiveTask<V> возвращает результат выполнения задачи:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

В нашем примере RecursiveTask<V> мы использовали метод invokeAll() для отправки последовательности подзадач в пул. Ту же работу можно выполнить с помощью fork() и join() , хотя это имеет последствия для упорядочения результатов.

Чтобы избежать путаницы, рекомендуется использовать метод invokeAll() для отправки более одной задачи в ForkJoinPool.

5. Выводы

Использование фреймворка fork/join может ускорить обработку больших задач, но для достижения этого результата необходимо следовать некоторым рекомендациям:

  • Используйте как можно меньше пулов потоков — в большинстве случаев лучшим решением является использование одного пула потоков для каждого приложения или системы.
  • Используйте общий пул потоков по умолчанию, если не требуется специальная настройка .
  • Используйте разумный порог для разделения ForkJoinTask на подзадачи
  • Избегайте любых блокировок в ваших ForkJoinTasks

Примеры, использованные в этой статье, доступны в связанном репозитории GitHub .