Перейти к основному содержимому

Пользовательские пулы потоков в параллельных потоках Java 8

· 3 мин. чтения

1. Обзор

В Java 8 появилась концепция потоков S как эффективного способа выполнения массовых операций над данными. А параллельные потоки можно получить в средах, поддерживающих параллелизм.

Эти потоки могут иметь повышенную производительность за счет накладных расходов на многопоточность.

В этом кратком руководстве мы рассмотрим одно из самых больших ограничений Stream API и посмотрим, как заставить параллельный поток работать с пользовательским экземпляром ThreadPool , в качестве альтернативы — есть библиотека, которая обрабатывает это .

2. Параллельный поток

Давайте начнем с простого примера — вызова метода parallelStream для любого из типов Collection — который вернет, возможно, параллельный Stream :

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();

assertTrue(parallelStream.isParallel());
}

Обработка по умолчанию, которая происходит в таком потоке , использует ForkJoinPool.commonPool(), пул потоков, совместно используемый всем приложением.

3. Пользовательский пул потоков

На самом деле мы можем передать пользовательский ThreadPool при обработке потока .

В следующем примере параллельный поток использует пользовательский ThreadPool для вычисления суммы длинных значений от 1 до 1 000 000 включительно:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {

long firstNum = 1;
long lastNum = 1_000_000;

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();

assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

Мы использовали конструктор ForkJoinPool с уровнем параллелизма 4. Чтобы определить оптимальное значение для различных сред, необходимо провести некоторые эксперименты, но хорошее эмпирическое правило заключается в простом выборе числа на основе количества ядер вашего процессора.

Далее мы обрабатывали содержимое параллельного Stream , суммируя их в вызове reduce .

Этот простой пример может не демонстрировать всю полезность использования пользовательского пула потоков, но преимущества становятся очевидными в ситуациях, когда мы не хотим связывать общий пул потоков с длительными задачами, такими как обработка данных из сетевого источника. – или общий пул потоков используется другими компонентами приложения.

Если мы запустим тестовый метод, описанный выше, он пройдет. Все идет нормально.

Однако если мы создадим экземпляр класса ForkJoinPool в обычном методе так же, как в тестовом методе, это может привести к ошибке OutOfMemoryError .

Далее давайте более подробно рассмотрим причину утечки памяти.

4. Остерегайтесь утечки памяти

Как мы говорили ранее, общий пул потоков по умолчанию используется всем приложением. Общий пул потоков — это статический экземпляр ThreadPool .

Поэтому утечки памяти не происходит, если мы используем пул потоков по умолчанию.

Теперь давайте рассмотрим наш метод тестирования. В тестовом методе мы создали объект ForkJoinPool. Когда тестовый метод завершится, объект customThreadPool не будет разыменовываться и собирать мусор — вместо этого он будет ожидать назначения новых задач .

То есть каждый раз, когда мы вызываем тестовый метод, будет создаваться новый объект customThreadPool , который не будет выпущен.

Решение проблемы довольно простое: выключите объект customThreadPool после того, как мы выполнили метод:

try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
customThreadPool.shutdown();
}

5. Вывод

Мы кратко рассмотрели, как запустить параллельный поток , используя собственный ThreadPool . В правильной среде и при правильном использовании уровня параллелизма в определенных ситуациях можно получить прирост производительности.

Если мы создадим пользовательский ThreadPool , мы должны иметь в виду вызов его метода shutdown() , чтобы избежать утечки памяти.

Полные примеры кода, упомянутые в этой статье, можно найти на GitHub .