Перейти к основному содержимому

Руководство по Stream.reduce()

· 8 мин. чтения

1. Обзор

Stream API предоставляет богатый набор промежуточных, редукционных и терминальных функций, которые также поддерживают распараллеливание.

В частности, операции потока сокращения позволяют нам получить один единственный результат из последовательности элементов , многократно применяя операцию объединения к элементам в последовательности.

В этом руководстве мы рассмотрим операцию Stream.reduce() общего назначения и рассмотрим ее в некоторых конкретных случаях использования.

2. Ключевые понятия: тождество, аккумулятор и сумматор.

Прежде чем мы углубимся в использование операции Stream.reduce() , давайте разобьем элементы-участники операции на отдельные блоки. Так мы легче поймем роль, которую играет каждый из них.

  • Identity — элемент, который является начальным значением операции сокращения и результатом по умолчанию, если поток пуст.
  • Аккумулятор — функция, которая принимает два параметра: частичный результат операции редукции и следующий элемент потока.
  • Combiner - функция, используемая для объединения частичного результата операции сокращения, когда сокращение распараллелено или когда существует несоответствие между типами аргументов аккумулятора и типами реализации аккумулятора.

3. Использование Stream.reduce()

Чтобы лучше понять функциональность элементов тождества, аккумулятора и сумматора, давайте рассмотрим несколько основных примеров:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int result = numbers
.stream()
.reduce(0, (subtotal, element) -> subtotal + element);
assertThat(result).isEqualTo(21);

В этом `` случае целочисленное значение 0 является идентификатором. Он хранит начальное значение операции сокращения, а также результат по умолчанию, когда поток целочисленных значений пуст.

Аналогично, лямбда-выражение :

subtotal, element -> subtotal + element

является аккумулятором , поскольку он принимает частичную сумму значений Integer и следующего элемента в потоке.

Чтобы сделать код еще более лаконичным, мы можем использовать ссылку на метод вместо лямбда-выражения:

int result = numbers.stream().reduce(0, Integer::sum);
assertThat(result).isEqualTo(21);

Конечно, мы можем использовать операцию reduce() для потоков, содержащих другие типы элементов.

Например, мы можем использовать метод reduce() для массива элементов String и объединить их в один результат:

List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
String result = letters
.stream()
.reduce("", (partialString, element) -> partialString + element);
assertThat(result).isEqualTo("abcde");

Точно так же мы можем переключиться на версию, которая использует ссылку на метод:

String result = letters.stream().reduce("", String::concat);
assertThat(result).isEqualTo("abcde");

Давайте воспользуемся операцией reduce() для соединения элементов массива букв в верхнем регистре:

String result = letters
.stream()
.reduce(
"", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase());
assertThat(result).isEqualTo("ABCDE");

Кроме того, мы можем использовать reduce() в параллельном потоке (подробнее об этом позже):

List<Integer> ages = Arrays.asList(25, 30, 45, 28, 32);
int computedAges = ages.parallelStream().reduce(0, (a, b) -> a + b, Integer::sum);

Когда поток выполняется параллельно, среда выполнения Java разбивает поток на несколько подпотоков. В таких случаях нам нужно использовать функцию для объединения результатов подпотоков в один. Это роль объединителя — в приведенном выше фрагменте это ссылка на метод Integer::sum .

Как ни странно, этот код не скомпилируется:

List<User> users = Arrays.asList(new User("John", 30), new User("Julie", 35));
int computedAges =
users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());

В этом случае у нас есть поток объектов User , а типы аргументов-аккумуляторов — Integer и User. Однако реализация аккумулятора представляет собой сумму целых чисел, поэтому компилятор просто не может определить тип пользовательского параметра.

Мы можем решить эту проблему с помощью объединителя:

int result = users.stream()
.reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
assertThat(result).isEqualTo(65);

Проще говоря, если мы используем последовательные потоки и типы аргументов накопителя и типы его реализации совпадают, нам не нужно использовать объединитель.

4. Параллельное сокращение

Как мы узнали ранее, мы можем использовать reduce() для параллельных потоков.

Когда мы используем параллельные потоки, мы должны убедиться, что reduce() или любые другие агрегатные операции, выполняемые над потоками:

  • ассоциативный: на результат не влияет порядок операндов
  • невмешательство: операция не влияет на источник данных
  • без сохранения состояния и детерминированность: операция не имеет состояния и выдает один и тот же результат для заданного ввода.

Мы должны выполнить все эти условия, чтобы предотвратить непредсказуемые результаты.

Как и ожидалось, операции, выполняемые с параллельными потоками, включая reduce() , выполняются параллельно, что позволяет использовать преимущества многоядерных аппаратных архитектур.

По очевидным причинам, параллельные потоки гораздо более производительны, чем последовательные. Тем не менее, они могут быть излишними, если операции, применяемые к потоку, не требуют больших затрат или количество элементов в потоке невелико.

Конечно, параллельные потоки — правильный путь, когда нам нужно работать с большими потоками и выполнять дорогостоящие агрегатные операции.

Давайте создадим простой эталонный тест JMH (Java Microbenchmark Harness) и сравним соответствующее время выполнения при использовании операции reduce() в последовательном и параллельном потоках:

@State(Scope.Thread)
private final List<User> userList = createUsers();

@Benchmark
public Integer executeReduceOnParallelizedStream() {
return this.userList
.parallelStream()
.reduce(
0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

@Benchmark
public Integer executeReduceOnSequentialStream() {
return this.userList
.stream()
.reduce(
0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

В приведенном выше тесте JMH мы сравниваем среднее время выполнения. Мы просто создаем список , содержащий большое количество объектов User . Затем мы вызываем метод reduce() для последовательного и параллельного потоков и проверяем, что последний работает быстрее первого (в секундах на операцию).

Вот результаты наших тестов:

Benchmark                                                   Mode  Cnt  Score    Error  Units
JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op
JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Генерация и обработка исключений при сокращении

В приведенных выше примерах операция reduce() не создает никаких исключений. Но может, конечно.

Например, предположим, что нам нужно разделить все элементы потока на указанный коэффициент, а затем просуммировать их:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int divider = 2;
int result = numbers.stream().reduce(0, a / divider + b / divider);

Это будет работать, пока переменная делителя не равна нулю. Но если оно равно нулю, метод reduce() выдаст исключение ArithmeticException : разделить на ноль.

Мы можем легко перехватить исключение и сделать с ним что-то полезное, например зарегистрировать его, восстановиться и т. д., в зависимости от варианта использования, с помощью блока try/catch :

public static int divideListElements(List<Integer> values, int divider) {
return values.stream()
.reduce(0, (a, b) -> {
try {
return a / divider + b / divider;
} catch (ArithmeticException e) {
LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
}
return 0;
});
}

Хотя этот подход будет работать, мы загрязнили лямбда-выражение блоком try/catch . У нас больше нет той чистой однострочной строчки, которая была раньше.

Чтобы решить эту проблему, мы можем использовать технику рефакторинга функции извлечения и выделить блок try/catch в отдельный метод :

private static int divide(int value, int factor) {
int result = 0;
try {
result = value / factor;
} catch (ArithmeticException e) {
LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
}
return result
}

Теперь реализация метода DivisionListElements() снова чистая и упорядоченная:

public static int divideListElements(List<Integer> values, int divider) {
return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider));
}

Предполагая, что DiveListElements() — это служебный метод, реализованный абстрактным классом NumberUtils , мы можем создать модульный тест для проверки поведения метода DivideListElements() :

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Давайте также протестируем метод DivisionListElements() , когда предоставленный список значений Integer содержит 0:

List<Integer> numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Наконец, давайте проверим реализацию метода, когда делитель тоже равен 0:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Сложные пользовательские объекты

Мы также можем использовать Stream.reduce() с пользовательскими объектами, которые содержат не примитивные поля. Для этого нам нужно предоставить соответствующий идентификатор , аккумулятор и объединитель для типа данных .

Предположим, что наш Пользователь является частью веб-сайта обзора. Каждый из наших Пользователей может иметь один Рейтинг , который усредняется по множеству Отзывов .

Во-первых, давайте начнем с нашего объекта Review .

Каждый отзыв должен содержать простой комментарий и оценку:

public class Review {

private int points;
private String review;

// constructor, getters and setters
}

Затем нам нужно определить наш рейтинг, который будет содержать наши обзоры вместе с полем баллов . По мере добавления отзывов это поле будет соответственно увеличиваться или уменьшаться:

public class Rating {

double points;
List<Review> reviews = new ArrayList<>();

public void add(Review review) {
reviews.add(review);
computeRating();
}

private double computeRating() {
double totalPoints =
reviews.stream().map(Review::getPoints).reduce(0, Integer::sum);
this.points = totalPoints / reviews.size();
return this.points;
}

public static Rating average(Rating r1, Rating r2) {
Rating combined = new Rating();
combined.reviews = new ArrayList<>(r1.reviews);
combined.reviews.addAll(r2.reviews);
combined.computeRating();
return combined;
}

}

Мы также добавили функцию среднего значения для вычисления среднего значения на основе двух входных Rating s. Это будет хорошо работать для наших компонентов объединителя и аккумулятора .

Далее давайте определим список пользователей , у каждого из которых есть свои наборы обзоров:

User john = new User("John", 30);
john.getRating().add(new Review(5, ""));
john.getRating().add(new Review(3, "not bad"));
User julie = new User("Julie", 35);
john.getRating().add(new Review(4, "great!"));
john.getRating().add(new Review(2, "terrible experience"));
john.getRating().add(new Review(4, ""));
List<User> users = Arrays.asList(john, julie);

Теперь, когда учтены Джон и Джули, давайте воспользуемся Stream.reduce() для вычисления среднего рейтинга для обоих пользователей.

В качестве идентификатора давайте вернем новый рейтинг , если наш входной список пуст :

Rating averageRating = users.stream()
.reduce(new Rating(),
(rating, user) -> Rating.average(rating, user.getRating()),
Rating::average);

Если мы посчитаем, то обнаружим, что средний балл равен 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Заключение

В этой статье мы узнали, как использовать операцию Stream.reduce() .

Кроме того, мы узнали, как выполнять редукции в последовательных и параллельных потоках и как обрабатывать исключения при редукции.

Как обычно, все примеры кода, показанные в этом руководстве, доступны на GitHub .