1. Обзор
В этом руководстве мы подробно рассмотрим, как различные варианты использования Java Stream API влияют на порядок, в котором поток генерирует, обрабатывает и собирает данные .
Мы также рассмотрим, как порядок влияет на производительность .
2. Порядок встречи
Проще говоря, порядок встречи — это порядок, в котором поток
встречает данные .
2.1. Столкновение с порядком источников сбора
Коллекция , которую
мы выбираем в качестве источника, влияет на порядок встреч в потоке.
Чтобы проверить это, давайте просто создадим два потока.
Наш первый создан из списка
, который имеет внутренний порядок.
Наш второй создан из TreeSet
, которого нет.
Затем мы собираем выходные данные каждого потока
в массив
для сравнения результатов.
@Test
public void givenTwoCollections_whenStreamedSequentially_thenCheckOutputDifferent() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
Set<String> set = new TreeSet<>(list);
Object[] listOutput = list.stream().toArray();
Object[] setOutput = set.stream().toArray();
assertEquals("[B, A, C, D, F]", Arrays.toString(listOutput));
assertEquals("[A, B, C, D, F]", Arrays.toString(setOutput));
}
Как мы можем сказать из нашего примера, TreeSet
не сохранил порядок нашей входной последовательности, поэтому шифрует порядок встречи Stream
.
Если наш поток
упорядочен, не имеет значения, обрабатываются ли наши данные последовательно или параллельно; реализация будет поддерживать порядок встречи Stream
.
Когда мы повторяем наш тест с использованием параллельных потоков, мы получаем тот же результат:
@Test
public void givenTwoCollections_whenStreamedInParallel_thenCheckOutputDifferent() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
Set<String> set = new TreeSet<>(list);
Object[] listOutput = list.stream().parallel().toArray();
Object[] setOutput = set.stream().parallel().toArray();
assertEquals("[B, A, C, D, F]", Arrays.toString(listOutput));
assertEquals("[A, B, C, D, F]", Arrays.toString(setOutput));
}
2.2. Удаление заказа
В любой момент мы можем явно удалить ограничение порядка с помощью неупорядоченного
метода .
Например, давайте объявим TreeSet
:
Set<Integer> set = new TreeSet<>(
Arrays.asList(-9, -5, -4, -2, 1, 2, 4, 5, 7, 9, 12, 13, 16, 29, 23, 34, 57, 102, 230));
И если мы стримим без вызова unordered
:
set.stream().parallel().limit(5).toArray();
Тогда естественный порядок TreeSet
сохраняется:
[-9, -5, -4, -2, 1]
Но если мы явно удалим порядок:
set.stream().unordered().parallel().limit(5).toArray();
Тогда вывод другой:
[1, 4, 7, 9, 23]
Причина двоякая: во-первых, поскольку последовательные потоки обрабатывают данные по одному элементу за раз, unordered
сам по себе мало влияет. Однако когда мы вызывали parallel
, мы также влияли на вывод.
3. Промежуточные операции
Мы также можем влиять на порядок потоков с помощью промежуточных операций .
В то время как большинство промежуточных операций будут поддерживать порядок потока,
некоторые по своей природе изменят его.
Например, мы можем повлиять на порядок потоков с помощью сортировки:
@Test
public void givenUnsortedStreamInput_whenStreamSorted_thenCheckOrderChanged() {
List<Integer> list = Arrays.asList(-3, 10, -4, 1, 3);
Object[] listOutput = list.stream().toArray();
Object[] listOutputSorted = list.stream().sorted().toArray();
assertEquals("[-3, 10, -4, 1, 3]", Arrays.toString(listOutput));
assertEquals("[-4, -3, 1, 3, 10]", Arrays.toString(listOutputSorted));
}
unordered
и empty
— еще два примера промежуточных операций, которые в конечном итоге изменят порядок Stream.
4. Терминальные операции
Наконец, мы можем влиять на порядок в зависимости от используемой терминальной операции .
4.1. ForEach
против ForEachOrdered
Может показаться, что ForEach
и ForEachOrdered
обеспечивают одинаковую функциональность, но у них есть одно ключевое отличие: ForEachOrdered
гарантирует сохранение порядка Stream
.
Если мы объявим список:
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
И используйте forEachOrdered
после распараллеливания:
list.stream().parallel().forEachOrdered(e -> logger.log(Level.INFO, e));
Затем упорядочивается вывод:
INFO: B
INFO: A
INFO: C
INFO: D
INFO: F
Однако, если мы используем forEach:
list.stream().parallel().forEach(e -> logger.log(Level.INFO, e));
Тогда вывод неупорядочен :
INFO: C
INFO: F
INFO: B
INFO: D
INFO: A
ForEach
регистрирует элементы в порядке их поступления из каждого потока. Второй поток
с его методом ForEachOrdered
ожидает завершения каждого предыдущего потока перед вызовом метода журнала .
4.2. Собирать
Когда мы используем метод collect
для агрегирования выходных данных Stream
, важно отметить, что выбранная нами коллекция
повлияет на порядок.
Например, изначально неупорядоченные коллекции
, такие как TreeSet
, не будут подчиняться порядку `` вывода Stream :
@Test
public void givenSameCollection_whenStreamCollected_checkOutput() {
List<String> list = Arrays.asList("B", "A", "C", "D", "F");
List<String> collectionList = list.stream().parallel().collect(Collectors.toList());
Set<String> collectionSet = list.stream().parallel()
.collect(Collectors.toCollection(TreeSet::new));
assertEquals("[B, A, C, D, F]", collectionList.toString());
assertEquals("[A, B, C, D, F]", collectionSet.toString());
}
При запуске нашего кода мы видим, что порядок нашего Stream
меняется путем сбора в Set.
4.3. Указание коллекций
_
В случае, если мы собираем в неупорядоченную коллекцию, используя, скажем, Collectors.toMap
, мы все еще можем обеспечить упорядочение, изменив реализацию наших методов Collectors
, чтобы использовать реализацию Linked .
Во- первых, мы инициализируем наш список вместе с обычной версией метода toMap
с двумя параметрами :
@Test
public void givenList_whenStreamCollectedToHashMap_thenCheckOrderChanged() {
List<String> list = Arrays.asList("A", "BB", "CCC");
Map<String, Integer> hashMap = list.stream().collect(Collectors
.toMap(Function.identity(), String::length));
Object[] keySet = hashMap.keySet().toArray();
assertEquals("[BB, A, CCC]", Arrays.toString(keySet));
}
Как и ожидалось, наша новая HashMap
не
сохранила исходный порядок входного списка, но давайте это изменим. ``
С нашим вторым потоком
мы будем использовать версию метода toMap с
4 параметрами , чтобы указать нашему поставщику
предоставить новый LinkedHashMap
:
``
@Test
public void givenList_whenCollectedtoLinkedHashMap_thenCheckOrderMaintained(){
List<String> list = Arrays.asList("A", "BB", "CCC");
Map<String, Integer> linkedHashMap = list.stream().collect(Collectors.toMap(
Function.identity(),
String::length,
(u, v) -> u,
LinkedHashMap::new
));
Object[] keySet = linkedHashMap.keySet().toArray();
assertEquals("[A, BB, CCC]", Arrays.toString(keySet));
}
Эй, так намного лучше!
Нам удалось сохранить первоначальный порядок списка, собрав наши данные в LinkedHashMap
.
5. Производительность
Если мы используем последовательные потоки, наличие или отсутствие порядка мало влияет на производительность нашей программы. Однако на параллельные потоки может сильно повлиять наличие упорядоченного потока
.
Причина этого в том, что каждый поток должен ждать вычисления предыдущего элемента Stream
.
Давайте попробуем продемонстрировать это, используя обвязку Java Microbenchmark , JMH, для измерения производительности.
В следующих примерах мы измерим затраты производительности на обработку упорядоченных и неупорядоченных параллельных потоков с некоторыми распространенными промежуточными операциями.
5.1. Отчетливый
Давайте настроим тест, используя отдельную
функцию как для упорядоченных, так и для неупорядоченных потоков.
@Benchmark
public void givenOrderedStreamInput_whenStreamDistinct_thenShowOpsPerMS() {
IntStream.range(1, 1_000_000).parallel().distinct().toArray();
}
@Benchmark
public void givenUnorderedStreamInput_whenStreamDistinct_thenShowOpsPerMS() {
IntStream.range(1, 1_000_000).unordered().parallel().distinct().toArray();
}
Когда мы нажимаем «Выполнить», мы видим разницу во времени, затрачиваемом на операцию:
Benchmark Mode Cnt Score Error Units
TestBenchmark.givenOrdered... avgt 2 222252.283 us/op
TestBenchmark.givenUnordered... avgt 2 78221.357 us/op
5.2. Фильтр
``
Далее мы будем использовать параллельный поток
с простым методом фильтра
для возврата каждого 10-го целого числа :
@Benchmark
public void givenOrderedStreamInput_whenStreamFiltered_thenShowOpsPerMS() {
IntStream.range(1, 100_000_000).parallel().filter(i -> i % 10 == 0).toArray();
}
@Benchmark
public void givenUnorderedStreamInput_whenStreamFiltered_thenShowOpsPerMS(){
IntStream.range(1,100_000_000).unordered().parallel().filter(i -> i % 10 == 0).toArray();
}
Интересно, что разница
между нашими двумя потоками намного меньше, чем при использовании метода Different.
Benchmark Mode Cnt Score Error Units
TestBenchmark.givenOrdered... avgt 2 116333.431 us/op
TestBenchmark.givenUnordered... avgt 2 111471.676 us/op
6. Заключение
В этой статье мы рассмотрели `упорядочение потоков, уделив особое внимание **различным этапам процесса
Stream` и тому, как каждый из них оказывает свое влияние** .
Наконец, мы увидели, как контракт заказа, размещенный в потоке
, может повлиять на производительность параллельных потоков.
Как всегда, ознакомьтесь с полным набором примеров на GitHub .