1. Обзор
В Java 8 появилась концепция потоков S как
эффективного способа выполнения массовых операций над данными. А параллельные потоки
можно получить в средах, поддерживающих параллелизм.
Эти потоки могут иметь повышенную производительность за счет накладных расходов на многопоточность.
В этом кратком руководстве мы рассмотрим одно из самых больших ограничений Stream
API и посмотрим, как заставить параллельный поток работать с пользовательским экземпляром ThreadPool , в качестве альтернативы —
есть библиотека, которая обрабатывает это .
2. Параллельный поток
Давайте начнем с простого примера — вызова метода parallelStream для любого из типов
Collection
— который вернет, возможно, параллельный Stream
:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
Обработка по умолчанию, которая происходит в таком потоке
, использует ForkJoinPool.commonPool(),
пул потоков, совместно используемый всем приложением.
3. Пользовательский пул потоков
На самом деле мы можем передать пользовательский ThreadPool
при обработке потока
.
В следующем примере параллельный поток
использует пользовательский ThreadPool
для вычисления суммы длинных значений от 1 до 1 000 000 включительно:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
Мы использовали конструктор ForkJoinPool
с уровнем параллелизма 4. Чтобы определить оптимальное значение для различных сред, необходимо провести некоторые эксперименты, но хорошее эмпирическое правило заключается в простом выборе числа на основе количества ядер вашего процессора.
Далее мы обрабатывали содержимое параллельного Stream
, суммируя их в вызове reduce .
Этот простой пример может не демонстрировать всю полезность использования пользовательского пула потоков, но преимущества становятся очевидными в ситуациях, когда мы не хотим связывать общий пул потоков с длительными задачами, такими как обработка данных из сетевого источника. – или общий пул потоков используется другими компонентами приложения.
Если мы запустим тестовый метод, описанный выше, он пройдет. Все идет нормально.
Однако если мы создадим экземпляр класса ForkJoinPool
в обычном методе так же, как в тестовом методе, это может привести к ошибке OutOfMemoryError
.
Далее давайте более подробно рассмотрим причину утечки памяти.
4. Остерегайтесь утечки памяти
Как мы говорили ранее, общий пул потоков по умолчанию используется всем приложением. Общий пул потоков — это статический экземпляр ThreadPool .
Поэтому утечки памяти не происходит, если мы используем пул потоков по умолчанию.
Теперь давайте рассмотрим наш метод тестирования. В тестовом методе мы создали объект ForkJoinPool.
Когда тестовый метод завершится, объект customThreadPool
не будет разыменовываться и собирать мусор — вместо этого он будет ожидать назначения новых задач .
То есть каждый раз, когда мы вызываем тестовый метод, будет создаваться новый объект customThreadPool
, который не будет выпущен.
Решение проблемы довольно простое: выключите
объект customThreadPool
после того, как мы выполнили метод:
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
customThreadPool.shutdown();
}
5. Вывод
Мы кратко рассмотрели, как запустить параллельный поток
, используя собственный ThreadPool
. В правильной среде и при правильном использовании уровня параллелизма в определенных ситуациях можно получить прирост производительности.
Если мы создадим пользовательский ThreadPool
, мы должны иметь в виду вызов его метода shutdown()
, чтобы избежать утечки памяти.
Полные примеры кода, упомянутые в этой статье, можно найти на GitHub .