Перейти к основному содержимому

Потоковое упорядочение в Java

· 6 мин. чтения

1. Обзор

В этом руководстве мы подробно рассмотрим, как различные варианты использования Java Stream API влияют на порядок, в котором поток генерирует, обрабатывает и собирает данные .

Мы также рассмотрим, как порядок влияет на производительность .

2. Порядок встречи

Проще говоря, порядок встречи это порядок, в котором поток встречает данные .

2.1. Столкновение с порядком источников сбора

Коллекция , которую мы выбираем в качестве источника, влияет на порядок встреч в потоке.

Чтобы проверить это, давайте просто создадим два потока.

Наш первый создан из списка , который имеет внутренний порядок.

Наш второй создан из TreeSet , которого нет.

Затем мы собираем выходные данные каждого потока в массив для сравнения результатов.

@Test
public void givenTwoCollections_whenStreamedSequentially_thenCheckOutputDifferent() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
Set<String> set = new TreeSet<>(list);

Object[] listOutput = list.stream().toArray();
Object[] setOutput = set.stream().toArray();

assertEquals("[B, A, C, D, F]", Arrays.toString(listOutput));
assertEquals("[A, B, C, D, F]", Arrays.toString(setOutput));
}

Как мы можем сказать из нашего примера, TreeSet не сохранил порядок нашей входной последовательности, поэтому шифрует порядок встречи Stream .

Если наш поток упорядочен, не имеет значения, обрабатываются ли наши данные последовательно или параллельно; реализация будет поддерживать порядок встречи Stream .

Когда мы повторяем наш тест с использованием параллельных потоков, мы получаем тот же результат:

@Test
public void givenTwoCollections_whenStreamedInParallel_thenCheckOutputDifferent() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
Set<String> set = new TreeSet<>(list);

Object[] listOutput = list.stream().parallel().toArray();
Object[] setOutput = set.stream().parallel().toArray();

assertEquals("[B, A, C, D, F]", Arrays.toString(listOutput));
assertEquals("[A, B, C, D, F]", Arrays.toString(setOutput));
}

2.2. Удаление заказа

В любой момент мы можем явно удалить ограничение порядка с помощью неупорядоченного метода .

Например, давайте объявим TreeSet :

Set<Integer> set = new TreeSet<>(
Arrays.asList(-9, -5, -4, -2, 1, 2, 4, 5, 7, 9, 12, 13, 16, 29, 23, 34, 57, 102, 230));

И если мы стримим без вызова unordered :

set.stream().parallel().limit(5).toArray();

Тогда естественный порядок TreeSet сохраняется:

[-9, -5, -4, -2, 1]

Но если мы явно удалим порядок:

set.stream().unordered().parallel().limit(5).toArray();

Тогда вывод другой:

[1, 4, 7, 9, 23]

Причина двоякая: во-первых, поскольку последовательные потоки обрабатывают данные по одному элементу за раз, unordered сам по себе мало влияет. Однако когда мы вызывали parallel , мы также влияли на вывод.

3. Промежуточные операции

Мы также можем влиять на порядок потоков с помощью промежуточных операций .

В то время как большинство промежуточных операций будут поддерживать порядок потока, некоторые по своей природе изменят его.

Например, мы можем повлиять на порядок потоков с помощью сортировки:

@Test
public void givenUnsortedStreamInput_whenStreamSorted_thenCheckOrderChanged() {
List<Integer> list = Arrays.asList(-3, 10, -4, 1, 3);

Object[] listOutput = list.stream().toArray();
Object[] listOutputSorted = list.stream().sorted().toArray();

assertEquals("[-3, 10, -4, 1, 3]", Arrays.toString(listOutput));
assertEquals("[-4, -3, 1, 3, 10]", Arrays.toString(listOutputSorted));
}

unordered и empty — еще два примера промежуточных операций, которые в конечном итоге изменят порядок Stream.

4. Терминальные операции

Наконец, мы можем влиять на порядок в зависимости от используемой терминальной операции .

4.1. ForEach против ForEachOrdered

Может показаться, что ForEach и ForEachOrdered обеспечивают одинаковую функциональность, но у них есть одно ключевое отличие: ForEachOrdered гарантирует сохранение порядка Stream .

Если мы объявим список:

List<String> list = Arrays.asList("B", "A", "C", "D", "F");

И используйте forEachOrdered после распараллеливания:

list.stream().parallel().forEachOrdered(e -> logger.log(Level.INFO, e));

Затем упорядочивается вывод:

INFO: B
INFO: A
INFO: C
INFO: D
INFO: F

Однако, если мы используем forEach:

list.stream().parallel().forEach(e -> logger.log(Level.INFO, e));

Тогда вывод неупорядочен :

INFO: C
INFO: F
INFO: B
INFO: D
INFO: A

ForEach регистрирует элементы в порядке их поступления из каждого потока. Второй поток с его методом ForEachOrdered ожидает завершения каждого предыдущего потока перед вызовом метода журнала .

4.2. Собирать

Когда мы используем метод collect для агрегирования выходных данных Stream , важно отметить, что выбранная нами коллекция повлияет на порядок.

Например, изначально неупорядоченные коллекции , такие как TreeSet , не будут подчиняться порядку `` вывода Stream :

@Test
public void givenSameCollection_whenStreamCollected_checkOutput() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");

List<String> collectionList = list.stream().parallel().collect(Collectors.toList());
Set<String> collectionSet = list.stream().parallel()
.collect(Collectors.toCollection(TreeSet::new));

assertEquals("[B, A, C, D, F]", collectionList.toString());
assertEquals("[A, B, C, D, F]", collectionSet.toString());
}

При запуске нашего кода мы видим, что порядок нашего Stream меняется путем сбора в Set.

4.3. Указание коллекций _

В случае, если мы собираем в неупорядоченную коллекцию, используя, скажем, Collectors.toMap , мы все еще можем обеспечить упорядочение, изменив реализацию наших методов Collectors , чтобы использовать реализацию Linked .

Во- первых, мы инициализируем наш список вместе с обычной версией метода toMap с двумя параметрами :

@Test
public void givenList_whenStreamCollectedToHashMap_thenCheckOrderChanged() {
List<String> list = Arrays.asList("A", "BB", "CCC");

Map<String, Integer> hashMap = list.stream().collect(Collectors
.toMap(Function.identity(), String::length));

Object[] keySet = hashMap.keySet().toArray();

assertEquals("[BB, A, CCC]", Arrays.toString(keySet));
}

Как и ожидалось, наша новая HashMap не сохранила исходный порядок входного списка, но давайте это изменим. ``

С нашим вторым потоком мы будем использовать версию метода toMap с 4 параметрами , чтобы указать нашему поставщику предоставить новый LinkedHashMap : ``

@Test
public void givenList_whenCollectedtoLinkedHashMap_thenCheckOrderMaintained(){
List<String> list = Arrays.asList("A", "BB", "CCC");

Map<String, Integer> linkedHashMap = list.stream().collect(Collectors.toMap(
Function.identity(),
String::length,
(u, v) -> u,
LinkedHashMap::new
));

Object[] keySet = linkedHashMap.keySet().toArray();

assertEquals("[A, BB, CCC]", Arrays.toString(keySet));
}

Эй, так намного лучше!

Нам удалось сохранить первоначальный порядок списка, собрав наши данные в LinkedHashMap .

5. Производительность

Если мы используем последовательные потоки, наличие или отсутствие порядка мало влияет на производительность нашей программы. Однако на параллельные потоки может сильно повлиять наличие упорядоченного потока .

Причина этого в том, что каждый поток должен ждать вычисления предыдущего элемента Stream .

Давайте попробуем продемонстрировать это, используя обвязку Java Microbenchmark , JMH, для измерения производительности.

В следующих примерах мы измерим затраты производительности на обработку упорядоченных и неупорядоченных параллельных потоков с некоторыми распространенными промежуточными операциями.

5.1. Отчетливый

Давайте настроим тест, используя отдельную функцию как для упорядоченных, так и для неупорядоченных потоков.

@Benchmark 
public void givenOrderedStreamInput_whenStreamDistinct_thenShowOpsPerMS() {
IntStream.range(1, 1_000_000).parallel().distinct().toArray();
}

@Benchmark
public void givenUnorderedStreamInput_whenStreamDistinct_thenShowOpsPerMS() {
IntStream.range(1, 1_000_000).unordered().parallel().distinct().toArray();
}

Когда мы нажимаем «Выполнить», мы видим разницу во времени, затрачиваемом на операцию:

Benchmark                        Mode  Cnt       Score   Error  Units
TestBenchmark.givenOrdered... avgt 2 222252.283 us/op
TestBenchmark.givenUnordered... avgt 2 78221.357 us/op

5.2. Фильтр ``

Далее мы будем использовать параллельный поток с простым методом фильтра для возврата каждого 10-го целого числа :

@Benchmark
public void givenOrderedStreamInput_whenStreamFiltered_thenShowOpsPerMS() {
IntStream.range(1, 100_000_000).parallel().filter(i -> i % 10 == 0).toArray();
}

@Benchmark
public void givenUnorderedStreamInput_whenStreamFiltered_thenShowOpsPerMS(){
IntStream.range(1,100_000_000).unordered().parallel().filter(i -> i % 10 == 0).toArray();
}

Интересно, что разница между нашими двумя потоками намного меньше, чем при использовании метода Different.

Benchmark                        Mode  Cnt       Score   Error  Units
TestBenchmark.givenOrdered... avgt 2 116333.431 us/op
TestBenchmark.givenUnordered... avgt 2 111471.676 us/op

6. Заключение

В этой статье мы рассмотрели `упорядочение потоков, уделив особое внимание **различным этапам процесса Stream` и тому, как каждый из них оказывает свое влияние** .

Наконец, мы увидели, как контракт заказа, размещенный в потоке , может повлиять на производительность параллельных потоков.

Как всегда, ознакомьтесь с полным набором примеров на GitHub .