Перейти к основному содержимому

Руководство по библиотеке параллельных сборщиков Java

· 5 мин. чтения

1. Введение

Parallel-collectors — это небольшая библиотека, предоставляющая набор сборщиков Java Stream API, которые обеспечивают параллельную обработку, в то же время обходя основные недостатки стандартных Parallel Streams.

2. Зависимости Maven

Если мы хотим начать использовать библиотеку, нам нужно добавить одну запись в файл pom.xml Maven :

<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>1.1.0</version>
</dependency>

Или одну строку в файле сборки Gradle:

compile 'com.pivovarit:parallel-collectors:1.1.0'

Новейшую версию можно найти на Maven Central .

3. Предостережения относительно параллельных потоков

Параллельные потоки были одним из основных моментов Java 8, но оказалось, что они применимы исключительно к интенсивной обработке ЦП.

Причиной этого был тот факт, что Parallel Streams внутренне поддерживались общим ForkJoinPool для всей JVM , который обеспечивал ограниченный параллелизм и использовался всеми Parallel Streams, работающими на одном экземпляре JVM.

Например, представьте, что у нас есть список идентификаторов, и мы хотим использовать их для получения списка пользователей, и эта операция требует больших затрат.

Для этого мы могли бы использовать Parallel Streams:

List<Integer> ids = Arrays.asList(1, 2, 3); 
List<String> results = ids.parallelStream()
.map(i -> fetchById(i)) // each operation takes one second
.collect(Collectors.toList());

System.out.println(results); // [user-1, user-2, user-3]

И действительно, мы видим заметное ускорение. Но это становится проблематичным, если мы начинаем выполнять несколько параллельных блокирующих операций… параллельно. Это может быстро насытить пул и привести к огромным задержкам. Вот почему важно создавать перегородки, создавая отдельные пулы потоков, чтобы предотвратить влияние несвязанных задач на выполнение друг друга.

Чтобы предоставить пользовательский экземпляр ForkJoinPool , мы могли бы использовать описанный здесь трюк , но этот подход опирался на недокументированный хак и был ошибочным до JDK10. Подробнее мы можем прочитать в самом выпуске — [JDK8190974] .

4. Параллельные коллекторы в действии

Параллельные коллекторы, как следует из названия, — это просто стандартные коллекторы Stream API, которые позволяют выполнять дополнительные операции параллельно на этапе collect() .

Класс ParallelCollectors (отражающий класс Collectors ) — это фасад, обеспечивающий доступ ко всему функционалу библиотеки.

Если бы мы хотели повторить приведенный выше пример, мы могли бы просто написать:

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
.collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

Результат тот же, однако мы смогли предоставить собственный пул потоков, указать собственный уровень параллелизма, и результат прибыл в экземпляре CompletableFuture , не блокируя текущий поток.

Стандартные параллельные потоки, с другой стороны, не могли достичь ни того, ни другого.

4.1. ParallelCollectors.parallelToList/ToSet()

Насколько это интуитивно понятно, если мы хотим параллельно обрабатывать Stream и собирать результаты в List или Set , мы можем просто использовать ParallelCollectors.parallelToList или parallelToSet :

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
.collect(parallelToList(i -> fetchById(i), executor, 4))
.join();

4.2. ParallelCollectors.parallelToMap()

Если мы хотим собрать элементы Stream в экземпляр Map , как и в случае с Stream API, нам нужно предоставить два преобразователя:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
.join(); // {1=user-1, 2=user-2, 3=user-3}

Мы также можем предоставить пользовательский экземпляр Map Supplier :

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
.join();

И собственная стратегия разрешения конфликтов:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
.join();

4.3. ParallelCollectors.parallelToCollection()

Аналогично предыдущему, мы можем передать нашего пользовательского поставщика коллекций , если мы хотим получить результаты, упакованные в наш пользовательский контейнер:

List<String> results = ids.stream()
.collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
.join();

4.4. ParallelCollectors.parallelToStream()

Если вышеперечисленного недостаточно, мы можем получить экземпляр Stream и продолжить настраиваемую обработку там:

Map<Integer, List<String>> results = ids.stream()
.collect(parallelToStream(i -> fetchById(i), executor, 4))
.thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
.join();

4.5. ParallelCollectors.parallel()

Это позволяет нам передавать результаты в порядке завершения:

ids.stream()
.collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);

// user-1
// user-3
// user-2

В этом случае мы можем ожидать, что сборщик каждый раз будет возвращать разные результаты, поскольку мы ввели случайную задержку обработки.

4.6. ParallelCollectors.parallelOrdered()

Это средство позволяет передавать результаты так же, как и выше, но сохраняет первоначальный порядок:

ids.stream()
.collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);

// user-1
// user-2
// user-3

В этом случае сборщик всегда будет поддерживать порядок, но может работать медленнее, чем указано выше.

5. Ограничения

На момент написания параллельные сборщики не работают с бесконечными потоками, даже если используются операции короткого замыкания — это конструктивное ограничение, налагаемое внутренними компонентами Stream API. Проще говоря, потоки обрабатывают коллекторы как операции без короткого замыкания, поэтому потоку необходимо обработать все восходящие элементы, прежде чем он будет завершен.

Другое ограничение заключается в том, что операции короткого замыкания не прерывают оставшиеся задачи после короткого замыкания.

6. Заключение

Мы увидели, как библиотека параллельных сборщиков позволяет нам выполнять параллельную обработку с помощью пользовательских сборщиков Java Stream API и CompletableFutures для использования пользовательских пулов потоков, параллелизма и неблокирующего стиля CompletableFutures.

Как всегда, фрагменты кода доступны на GitHub .

Для дальнейшего чтения смотрите библиотеку parallel-collectors на GitHub, блог автора и аккаунт автора в Twitter .