Перейти к основному содержимому

Когда использовать параллельный поток в Java

· 9 мин. чтения

Задача: Сумма двух чисел

Напишите функцию twoSum. Которая получает массив целых чисел nums и целую сумму target, а возвращает индексы двух чисел, сумма которых равна target. Любой набор входных данных имеет ровно одно решение, и вы не можете использовать один и тот же элемент дважды. Ответ можно возвращать в любом порядке...

ANDROMEDA

1. Обзор

В Java 8 появился Stream API , который позволяет легко перебирать коллекции как потоки данных. Также очень легко создавать потоки, которые выполняются параллельно и используют несколько ядер процессора.

Можно подумать, что всегда быстрее разделить работу на большее количество ядер. Но это часто не так.

В этом руководстве мы рассмотрим различия между последовательными и параллельными потоками. Сначала мы рассмотрим пул fork-join по умолчанию, используемый параллельными потоками.

Мы также рассмотрим последствия использования параллельного потока для производительности, включая локальность памяти и затраты на разделение/слияние.

Наконец, мы порекомендуем, когда имеет смысл преобразовать последовательный поток в параллельный.

2. Потоки в Java

Поток в Java — это просто оболочка вокруг источника данных , позволяющая нам выполнять массовые операции с данными удобным способом.

Он не хранит данные и не вносит никаких изменений в базовый источник данных. Скорее, он добавляет поддержку операций функционального стиля в конвейерах данных.

2.1. Последовательные потоки

По умолчанию любая потоковая операция в Java обрабатывается последовательно, если явно не указано, что она параллельна.

Последовательные потоки используют один поток для обработки конвейера:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);

Вывод этого последовательного потока предсказуем. Элементы списка всегда будут печататься в упорядоченной последовательности:

1 main
2 main
3 main
4 main

2.2. Параллельные потоки

Любой поток в Java можно легко преобразовать из последовательного в параллельный.

Мы можем добиться этого, добавив метод parallel к последовательному потоку или создав поток, используя метод parallelStream коллекции :

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);

Параллельные потоки позволяют нам выполнять код параллельно на отдельных ядрах. Конечным результатом является комбинация каждого отдельного результата.

Однако порядок исполнения находится вне нашего контроля. Он может меняться каждый раз, когда мы запускаем программу:

4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main

3. Платформа разветвления

Параллельные потоки используют структуру fork-join и ее общий пул рабочих потоков.

Платформа fork-join была добавлена в java.util.concurrent в Java 7 для управления задачами между несколькими потоками.

3.1. Разделение источника

Платформа fork-join отвечает за разделение исходных данных между рабочими потоками и обработку обратного вызова при завершении задачи.

Давайте рассмотрим пример параллельного вычисления суммы целых чисел.

Мы воспользуемся методом сокращения и добавим пять к исходной сумме вместо того, чтобы начинать с нуля:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);

В последовательном потоке результатом этой операции будет 15.

Но поскольку операция сокращения выполняется параллельно, число пять фактически добавляется в каждом рабочем потоке:

./3163824ade3cb12124198ca45f16a08b.png

Фактический результат может отличаться в зависимости от количества потоков, используемых в общем пуле fork-join.

Чтобы решить эту проблему, число пять должно быть добавлено вне параллельного потока:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);

Поэтому нам нужно быть осторожными с тем, какие операции могут выполняться параллельно.

3.2. Общий пул потоков

Количество потоков в общем пуле равно количеству ядер процессора.

Однако API позволяет нам указать количество потоков, которые он будет использовать, передав параметр JVM:

-D java.util.concurrent.ForkJoinPool.common.parallelism=4

Важно помнить, что это глобальный параметр, который повлияет на все параллельные потоки и любые другие задачи разветвления, использующие общий пул. Мы настоятельно рекомендуем не изменять этот параметр, если у нас нет для этого веских причин.

3.3. Пользовательский пул потоков

Помимо общего пула потоков по умолчанию, также можно запустить параллельный поток в пользовательском пуле потоков :

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
() -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);

Обратите внимание, что использование общего пула потоков рекомендуется Oracle. У нас должна быть очень веская причина для запуска параллельных потоков в пользовательских пулах потоков.

4. Влияние на производительность

Параллельная обработка может быть полезна для полного использования нескольких ядер. Но нам также необходимо учитывать накладные расходы на управление несколькими потоками, локальность памяти, разделение источника и слияние результатов.

4.1. Накладные расходы

Давайте посмотрим на пример целочисленного потока.

Мы запустим тест на последовательной и параллельной операции сокращения:

IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);

При таком простом сокращении суммы преобразование последовательного потока в параллельный приводило к ухудшению производительности:

Benchmark                                                     Mode  Cnt        Score        Error  Units
SplittingCosts.sourceSplittingIntStreamParallel avgt 25 35476,283 ± 204,446 ns/op
SplittingCosts.sourceSplittingIntStreamSequential avgt 25 68,274 ± 0,963 ns/op

Причина этого в том, что иногда накладные расходы на управление потоками, источниками и результатами обходятся дороже, чем выполнение фактической работы.

4.2. Разделение затрат

Равномерное разделение источника данных — необходимая затрата для обеспечения параллельного выполнения, но некоторые источники данных разделяются лучше, чем другие.

Давайте продемонстрируем это, используя ArrayList и LinkedList :

private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();

static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
arrayListOfNumbers.add(i);
linkedListOfNumbers.add(i);
});
}

Мы запустим тест последовательной и параллельной операции сокращения для двух типов списков:

arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);

Наши результаты показывают, что преобразование последовательного потока в параллельный приносит выигрыш в производительности только для ArrayList :

Benchmark                                                     Mode  Cnt        Score        Error  Units
DifferentSourceSplitting.differentSourceArrayListParallel avgt 25 2004849,711 ± 5289,437 ns/op
DifferentSourceSplitting.differentSourceArrayListSequential avgt 25 5437923,224 ± 37398,940 ns/op
DifferentSourceSplitting.differentSourceLinkedListParallel avgt 25 13561609,611 ± 275658,633 ns/op
DifferentSourceSplitting.differentSourceLinkedListSequential avgt 25 10664918,132 ± 254251,184 ns/op

Причина этого в том, что массивы можно разбивать дешево и равномерно , в то время как LinkedList не имеет ни одного из этих свойств. TreeMap и HashSet разделяются лучше, чем LinkedList , но не так хорошо, как массивы.

4.3. Объединение затрат

Каждый раз, когда мы разделяем исходный код для параллельных вычислений, нам также нужно обязательно объединять результаты в конце.

Давайте запустим тест на последовательном и параллельном потоке с суммированием и группировкой как разными операциями слияния:

arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
arrayListOfNumbers.stream().collect(Collectors.toSet());
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet())

Наши результаты показывают, что преобразование последовательного потока в параллельный приносит выигрыш в производительности только для операции суммирования:

Benchmark                                                     Mode  Cnt        Score        Error  Units
MergingCosts.mergingCostsGroupingParallel avgt 25 135093312,675 ± 4195024,803 ns/op
MergingCosts.mergingCostsGroupingSequential avgt 25 70631711,489 ± 1517217,320 ns/op
MergingCosts.mergingCostsSumParallel avgt 25 2074483,821 ± 7520,402 ns/op
MergingCosts.mergingCostsSumSequential avgt 25 5509573,621 ± 60249,942 ns/op

Операция слияния действительно дешева для некоторых операций, таких как сокращение и сложение, но операции слияния, такие как группировка в наборы или карты, могут быть довольно дорогими.

4.4. Локализация памяти

Современные компьютеры используют сложную многоуровневую кэш-память, чтобы хранить часто используемые данные рядом с процессором. Когда обнаруживается линейный шаблон доступа к памяти, аппаратное обеспечение предварительно выбирает следующую строку данных, предполагая, что она, вероятно, скоро понадобится.

Параллелизм дает преимущества в производительности, когда мы можем заставлять ядра процессора выполнять полезную работу. Поскольку ожидание промахов кеша не является полезной работой, нам нужно рассматривать пропускную способность памяти как ограничивающий фактор.

Давайте продемонстрируем это, используя два массива, один из которых использует примитивный тип, а другой — объектный тип данных:

private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];

static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
intArray[i-1] = i;
integerArray[i-1] = i;
});
}

Мы запустим тест последовательной и параллельной операции сокращения на двух массивах:

Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);

Наши результаты показывают, что преобразование последовательного потока в параллельный приносит немного больше преимуществ в производительности при использовании массива примитивов:

Benchmark                                                     Mode  Cnt        Score        Error  Units
MemoryLocalityCosts.localityIntArrayParallel avgt 25 116247,787 ± 283,150 ns/op
MemoryLocalityCosts.localityIntArraySequential avgt 25 293142,385 ± 2526,892 ns/op
MemoryLocalityCosts.localityIntegerArrayParallel avgt 25 2153732,607 ± 16956,463 ns/op
MemoryLocalityCosts.localityIntegerArraySequential avgt 25 5134866,640 ± 148283,942 ns/op

Массив примитивов обеспечивает наилучшую локальность, возможную в Java. В общем, чем больше указателей в нашей структуре данных, тем больше нагрузки мы оказываем на память для извлечения ссылочных объектов. Это может отрицательно сказаться на распараллеливании, поскольку несколько ядер одновременно извлекают данные из памяти.

4.5. NQ - модель

Oracle представила простую модель, которая может помочь нам определить, может ли параллелизм повысить производительность. В модели NQ N обозначает количество элементов исходных данных, а Q представляет собой объем вычислений, выполняемых для каждого элемента данных.

Чем больше произведение N*Q , тем больше вероятность, что мы получим прирост производительности за счет распараллеливания. Для задач с тривиально малым Q , таких как суммирование чисел, эмпирическое правило состоит в том, что N должно быть больше 10 000. По мере увеличения количества вычислений размер данных, необходимый для повышения производительности за счет параллелизма, уменьшается.

5. Когда использовать параллельные потоки

Как мы видели, нам нужно быть очень внимательными при использовании параллельных потоков.

Параллелизм может повысить производительность в определенных случаях использования. Но параллельные потоки нельзя рассматривать как магический усилитель производительности. Таким образом, последовательные потоки по-прежнему должны использоваться по умолчанию во время разработки.

Последовательный поток может быть преобразован в параллельный, когда у нас есть реальные требования к производительности. Учитывая эти требования, мы должны сначала провести измерение производительности и рассмотреть параллелизм как возможную стратегию оптимизации.

Большой объем данных и множество вычислений, выполняемых для каждого элемента, указывают на то, что параллелизм может быть хорошим вариантом.

С другой стороны, небольшой объем данных, неравномерное разбиение исходников, дорогостоящие операции слияния и плохая локальность памяти указывают на потенциальную проблему при параллельном выполнении.

6. Заключение

В этой статье мы исследовали разницу между последовательными и параллельными потоками в Java. Мы узнали, что параллельные потоки используют пул fork-join по умолчанию и его рабочие потоки.

Затем мы увидели, что параллельные потоки не всегда приносят выигрыш в производительности. Мы учли накладные расходы на управление несколькими потоками, локальность памяти, разделение источника и слияние результатов. Мы видели, что массивы — отличный источник данных для параллельного выполнения, потому что они обеспечивают наилучшую возможную локальность и могут быть дешево и равномерно разделены.

Наконец, мы рассмотрели модель NQ и рекомендовали использовать параллельные потоки только тогда, когда у нас есть реальные требования к производительности.

Как всегда, исходный код доступен на GitHub .