1. Обзор
В этом всеобъемлющем руководстве мы рассмотрим практическое использование потоков Java 8 от создания до параллельного выполнения.
Чтобы понять этот материал, читатели должны иметь базовые знания Java 8 (лямбда-выражения, необязательно,
ссылки на методы) и Stream API. Чтобы лучше ознакомиться с этими темами, ознакомьтесь с нашими предыдущими статьями: « Новые возможности в Java 8 » и « Введение в потоки Java 8» .
2. Создание потока
Существует много способов создать экземпляр потока из разных источников. После создания экземпляр не будет изменять свой источник, что позволяет создавать несколько экземпляров из одного источника.
2.1. Пустой поток
Мы должны использовать метод empty()
в случае создания пустого потока:
Stream<String> streamEmpty = Stream.empty();
Мы часто используем метод empty()
при создании, чтобы избежать возврата null
для потоков без элемента:
public Stream<String> streamOf(List<String> list) {
return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
2.2. Поток сбора
Мы также можем создать поток любого типа Collection
( Collection, List, Set
):
Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
2.3. Поток массива
Массив также может быть источником потока:
Stream<String> streamOfArray = Stream.of("a", "b", "c");
Мы также можем создать поток из существующего массива или части массива:
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);
2.4. Поток.строитель()
При использовании билдера в правой части оператора необходимо дополнительно указать нужный тип, иначе метод build()
создаст экземпляр Stream<Object>:
Stream<String> streamBuilder =
Stream.<String>builder().add("a").add("b").add("c").build();
2.5. Поток.генерировать()
Метод generate()
принимает Supplier<T>
для генерации элемента. Поскольку результирующий поток бесконечен, разработчик должен указать желаемый размер, иначе метод generate()
будет работать до тех пор, пока не будет исчерпан лимит памяти:
Stream<String> streamGenerated =
Stream.generate(() -> "element").limit(10);
Приведенный выше код создает последовательность из десяти строк со значением «элемент».
2.6. Поток.итерация()
Другой способ создания бесконечного потока — использование метода iterate()
:
Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
Первый элемент результирующего потока является первым параметром метода iterate()
. При создании каждого следующего элемента указанная функция применяется к предыдущему элементу. В приведенном выше примере вторым элементом будет 42.
2.7. Поток примитивов
Java 8 предлагает возможность создавать потоки из трех примитивных типов: int, long
и double.
Поскольку Stream<T>
является универсальным интерфейсом, и нельзя использовать примитивы в качестве параметра типа с универсальными шаблонами, были созданы три новых специальных интерфейса: IntStream, LongStream, DoubleStream.
Использование новых интерфейсов избавляет от ненужного автобокса, что позволяет повысить производительность:
IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
Метод range(int startInclusive, int endExclusive)
создает упорядоченный поток от первого параметра до второго параметра. Он увеличивает значение последующих элементов с шагом, равным 1. Результат не включает последний параметр, это просто верхняя граница последовательности.
Метод rangeClosed(int startInclusive, int endInclusive)
делает то же самое с той лишь разницей, что включается второй элемент. Мы можем использовать эти два метода для генерации любого из трех типов потоков примитивов.
Начиная с Java 8 класс Random
предоставляет широкий набор методов для генерации потоков примитивов. Например, следующий код создает DoubleStream, состоящий
из трех элементов:
Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
2.8. Поток строки
Мы также можем использовать String
в качестве источника для создания потока с помощью метода chars()
класса String
. Поскольку в JDK нет интерфейса для CharStream
, вместо этого мы используем IntStream
для представления потока символов.
IntStream streamOfChars = "abc".chars();
В следующем примере строка
разбивается на подстроки в соответствии с указанным RegEx
:
Stream<String> streamOfString =
Pattern.compile(", ").splitAsStream("a, b, c");
2.9. Поток файла
Кроме того, файлы
класса Java NIO позволяют нам генерировать Stream<String>
текстового файла с помощью метода lines() .
Каждая строка текста становится элементом потока:
Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset =
Files.lines(path, Charset.forName("UTF-8"));
Набор символов
может быть указан как аргумент метода lines()
.
3. Ссылка на поток
Мы можем создать экземпляр потока и иметь доступную ссылку на него, пока вызываются только промежуточные операции. Выполнение терминальной операции делает поток недоступным .
Чтобы продемонстрировать это, мы на время забудем, что наилучшей практикой является цепочка операций. Помимо излишней многословности, технически следующий код действителен:
Stream<String> stream =
Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
Однако попытка повторного использования той же ссылки после вызова терминальной операции вызовет исключение IllegalStateException:
Optional<String> firstElement = stream.findFirst();
Поскольку IllegalStateException
является RuntimeException
, компилятор не будет сигнализировать о проблеме. Поэтому очень важно помнить, что потоки Java 8 нельзя использовать повторно.
Такое поведение логично. Мы разработали потоки для применения конечной последовательности операций к источнику элементов в функциональном стиле, а не для хранения элементов.
Итак, чтобы предыдущий код работал правильно, необходимо внести некоторые изменения:
List<String> elements =
Stream.of("a", "b", "c").filter(element -> element.contains("b"))
.collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
4. Потоковый конвейер
Чтобы выполнить последовательность операций над элементами источника данных и агрегировать их результаты, нам нужны три части: источник , промежуточная(ые) операция(и) и терминальная операция.
Промежуточные операции возвращают новый измененный поток. Например, чтобы создать новый поток из существующего без нескольких элементов, следует использовать метод skip() :
Stream<String> onceModifiedStream =
Stream.of("abcd", "bbcd", "cbcd").skip(1);
Если нам нужно более одной модификации, мы можем объединить промежуточные операции. Предположим, что нам также нужно заменить каждый элемент текущего Stream<String>
подстрокой из первых нескольких символов. Мы можем сделать это, объединив методы skip()
и map()
:
Stream<String> twiceModifiedStream =
stream.skip(1).map(element -> element.substring(0, 3));
Как мы видим, метод map()
принимает в качестве параметра лямбда-выражение. Если мы хотим узнать больше о лямбда-выражениях, мы можем взглянуть на наш учебник Lambda Expressions and Functional Interfaces: Tips and Best Practices .
Поток сам по себе ничего не стоит; пользователя интересует результат работы терминала, который может быть значением некоторого типа или действием, примененным к каждому элементу потока. Мы можем использовать только одну терминальную операцию на поток.
Правильный и наиболее удобный способ использования потоков — потоковый конвейер, представляющий собой цепочку из источника потока, промежуточных операций и терминальной операции:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
.map(element -> element.substring(0, 3)).sorted().count();
5. Ленивый призыв
Промежуточные операции ленивы. Это означает, что они будут вызываться только в том случае, если это необходимо для выполнения терминальной операции.
Например, давайте вызовем метод wasCalled()
,
который увеличивает внутренний счетчик при каждом вызове:
private long counter;
private void wasCalled() {
counter++;
}
Теперь вызовем метод wasCalled
()
из операции filter()
:
List<String> list = Arrays.asList(“abc1”, “abc2”, “abc3”);
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("2");
});
Поскольку у нас есть источник из трех элементов, мы можем предположить, что метод filter()
будет вызываться три раза, а значение переменной счетчика
будет равно 3. Однако выполнение этого кода вообще не меняет счетчик
, он по-прежнему ноль, поэтому метод filter()
даже не вызывался ни разу. Причина, по которой отсутствует работа терминала.
Давайте немного перепишем этот код, добавив операцию map()
и терминальную операцию findFirst().
Также добавим возможность отслеживать порядок вызовов методов с помощью логирования:
Optional<String> stream = list.stream().filter(element -> {
log.info("filter() was called");
return element.contains("2");
}).map(element -> {
log.info("map() was called");
return element.toUpperCase();
}).findFirst();
Полученный журнал показывает, что мы дважды вызывали метод filter() и один раз — метод
map()
. Это связано с тем, что конвейер выполняется вертикально. В нашем примере первый элемент потока не удовлетворял предикату фильтра. Затем мы вызвали метод filter()
для второго элемента, который прошел фильтр. Не вызывая filter()
для третьего элемента, мы спустились по конвейеру к методу map() .
Операция findFirst()
удовлетворяет только одному элементу. Таким образом, в этом конкретном примере ленивый вызов позволил нам избежать двух вызовов методов, одного для filter()
и одного для map().
6. Порядок исполнения
С точки зрения производительности правильный порядок является одним из наиболее важных аспектов цепочки операций в потоковом конвейере:
long size = list.stream().map(element -> {
wasCalled();
return element.substring(0, 3);
}).skip(2).count();
Выполнение этого кода увеличит значение счетчика на три. Это означает, что мы вызвали метод map()
потока три раза, но значение размера
равно единице. Таким образом, результирующий поток имеет только один элемент, и мы без всякой причины выполнили дорогостоящие операции map() два из трех раз.
Если мы изменим порядок методов skip ()
и map()
,
счетчик увеличится только на единицу .
Поэтому мы будем вызывать метод map()
только один раз:
long size = list.stream().skip(2).map(element -> {
wasCalled();
return element.substring(0, 3);
}).count();
Это приводит нас к следующему правилу: промежуточные операции, уменьшающие размер потока, должны располагаться перед операциями, применяемыми к каждому элементу. Таким образом, нам нужно держать такие методы, как s kip(), filter()
и Different()
, в верхней части нашего потокового конвейера.
7. Сокращение потока
API имеет множество терминальных операций, которые объединяют поток в тип или в примитив: count(), max(), min()
и sum().
Однако эти операции работают в соответствии с предопределенной реализацией. Так что , если разработчику нужно настроить механизм сокращения Stream? Есть два метода, которые позволяют нам это сделать: методы reduce()
и collect()
.
7.1. Метод уменьшения (
)
Существует три варианта этого метода, которые отличаются своими сигнатурами и возвращаемыми типами. Они могут иметь следующие параметры:
identity — начальное значение для аккумулятора или значение по умолчанию, если поток пуст и накапливать нечего.
аккумулятор – функция, задающая логику агрегации элементов. Поскольку аккумулятор создает новое значение для каждого шага сокращения, количество новых значений равно размеру потока, и полезно только последнее значение. Это не очень хорошо для производительности.
объединитель — функция, которая агрегирует результаты аккумулятора. Мы вызываем объединитель только в параллельном режиме, чтобы уменьшить результаты аккумуляторов из разных потоков.
Теперь давайте посмотрим на эти три метода в действии:
OptionalInt reduced =
IntStream.range(1, 4).reduce((a, b) -> a + b);
уменьшено
= 6 (1 + 2 + 3)
int reducedTwoParams =
IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
уменьшенныйTwoParams
= 16 (10 + 1 + 2 + 3)
int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
Результат будет тот же, что и в предыдущем примере (16), а логина не будет, значит, объединитель не вызывался. Чтобы объединитель работал, поток должен быть параллельным:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
Здесь результат другой (36), а объединитель вызывался дважды. Здесь редукция работает по следующему алгоритму: трижды запускался аккумулятор, прибавляя каждый элемент потока к identity
. Эти действия выполняются параллельно. В результате имеем (10+1=11; 10+2=12; 10+3=13;). Теперь объединитель может объединить эти три результата. Для этого нужно две итерации (12 + 13 = 25; 25 + 11 = 36).
7.2. Метод сбора(
)
Сокращение потока также может быть выполнено с помощью другой терминальной операции — метода collect() .
Он принимает аргумент типа Collector,
который определяет механизм редукции. Для наиболее распространенных операций уже созданы предопределенные сборщики. Доступ к ним можно получить с помощью типа Collectors
.
В этом разделе мы будем использовать следующий список
в качестве источника для всех потоков:
List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
new Product(14, "orange"), new Product(13, "lemon"),
new Product(23, "bread"), new Product(13, "sugar"));
Преобразование потока в коллекцию
( Collection, List
или Set
):
List<String> collectorCollection =
productList.stream().map(Product::getName).collect(Collectors.toList());
Сокращение к строке
:
String listToString = productList.stream().map(Product::getName)
.collect(Collectors.joining(", ", "[", "]"));
Метод joiner()
может иметь от одного до трех параметров (разделитель, префикс, суффикс). Самое удобное в использовании joiner()
то, что разработчику не нужно проверять, достигает ли поток своего конца, чтобы применить суффикс и не применять разделитель. Коллектор
позаботится об этом.
Обработка среднего значения всех числовых элементов потока:
double averagePrice = productList.stream()
.collect(Collectors.averagingInt(Product::getPrice));
Обработка суммы всех числовых элементов потока:
int summingPrice = productList.stream()
.collect(Collectors.summingInt(Product::getPrice));
Методы averagingXX(), summingXX()
и summarizingXX()
могут работать с примитивами ( int, long, double
) и их классами-оболочками ( Integer, Long, Double
). Еще одной мощной особенностью этих методов является обеспечение отображения. В результате разработчику не нужно использовать дополнительную операцию map()
перед методом collect() .
Сбор статистической информации об элементах потока:
IntSummaryStatistics statistics = productList.stream()
.collect(Collectors.summarizingInt(Product::getPrice));
Используя результирующий экземпляр типа IntSummaryStatistics
, разработчик может создать статистический отчет, применив метод toString()
. Результатом будет строка
, общая для этого «IntSummaryStatistics{count=5, sum=86, min=13, medium=17,200000, max=23}».
Также легко извлечь из этого объекта отдельные значения для количества, суммы,
минимума и среднего значения
, применяя методы getCount(), getSum(), getMin(), getAverage()
и getMax().
Все эти значения могут быть извлечены из одного конвейера.
Группировка элементов потока по заданной функции:
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
.collect(Collectors.groupingBy(Product::getPrice));
В примере выше поток был сведен к Карте
, которая группирует все товары по их цене.
Разделение элементов потока на группы по некоторому предикату:
Map<Boolean, List<Product>> mapPartioned = productList.stream()
.collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
Нажатие коллектора для выполнения дополнительной трансформации:
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
В данном конкретном случае сборщик преобразовал поток в Set
, а затем создал из него неизменяемый Set
.
Пользовательский коллектор:
Если по какой-то причине необходимо создать пользовательский сборщик, самый простой и наименее подробный способ сделать это — использовать метод ()
типа Collector.
Collector<Product, ?, LinkedList<Product>> toLinkedList =
Collector.of(LinkedList::new, LinkedList::add,
(first, second) -> {
first.addAll(second);
return first;
});
LinkedList<Product> linkedListOfPersons =
productList.stream().collect(toLinkedList);
В этом примере экземпляр Collector
был сокращен до LinkedList
<Person>.
8. Параллельные потоки
До Java 8 распараллеливание было сложным. Появление ExecutorService
и ForkJoin немного
упростило жизнь разработчику, но все равно стоило помнить, как создать конкретный экзекьютор, как его запустить и так далее. Java 8 представила способ реализации параллелизма в функциональном стиле.
API позволяет нам создавать параллельные потоки, которые выполняют операции в параллельном режиме. Когда источником потока является Коллекция
или массив
, этого можно добиться с помощью метода parallelStream()
:
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
.map(product -> product.getPrice() * 12)
.anyMatch(price -> price > 200);
Если источником потока является что-то отличное от Collection
или массива
, следует использовать метод parallel() :
IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();
Под капотом Stream API автоматически используется инфраструктура ForkJoin
для параллельного выполнения операций. По умолчанию будет использоваться общий пул потоков, и нет возможности (по крайней мере, на данный момент) назначить ему какой-либо пользовательский пул потоков. Этого можно избежать, используя собственный набор параллельных коллекторов.
При использовании потоков в параллельном режиме избегайте блокирующих операций. Также лучше использовать параллельный режим, когда задачам требуется одинаковое количество времени для выполнения. Если одна задача длится намного дольше другой, это может замедлить рабочий процесс всего приложения.
Поток в параллельном режиме можно преобразовать обратно в последовательный режим с помощью метода sequence()
:
IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();
9. Заключение
Stream API — это мощный, но простой для понимания набор инструментов для обработки последовательности элементов. При правильном использовании это позволяет нам сократить огромное количество стандартного кода, создавать более читаемые программы и повысить производительность приложения.
В большинстве примеров кода, показанных в этой статье, мы оставили потоки неиспользованными (мы не применяли метод close()
или терминальную операцию). В реальном приложении не оставляйте созданный поток неиспользованным, так как это приведет к утечке памяти.
Полные примеры кода, сопровождающие эту статью, доступны на GitHub.