Перейти к основному содержимому

Как выйти из Java Stream forEach

· 4 мин. чтения

1. Обзор

Как разработчики Java, мы часто пишем код, который перебирает набор элементов и выполняет операцию над каждым из них. Библиотека потоков Java 8 и ее метод forEach позволяют нам писать этот код чистым, декларативным образом.

Хотя это похоже на циклы, нам не хватает эквивалента оператора break для прерывания итерации . Поток может быть очень длинным или потенциально бесконечным , и если у нас нет причин продолжать его обработку, мы бы предпочли прервать его, а не ждать его последнего элемента.

В этом руководстве мы рассмотрим некоторые механизмы, позволяющие моделировать оператор break в операции Stream.forEach .

2. Stream.takeWhile() в Java 9

Предположим, у нас есть поток элементов String , и мы хотим обрабатывать его элементы, если их длина нечетна.

Давайте попробуем метод Java 9 Stream.takeWhile :

Stream.of("cat", "dog", "elephant", "fox", "rabbit", "duck")
.takeWhile(n -> n.length() % 2 != 0)
.forEach(System.out::println);

Если мы запустим это, мы получим вывод:

cat
dog

Давайте сравним это с эквивалентным кодом на простой Java, используя цикл for и оператор break , чтобы понять, как это работает:

List<String> list = asList("cat", "dog", "elephant", "fox", "rabbit", "duck");
for (int i = 0; i < list.size(); i++) {
String item = list.get(i);
if (item.length() % 2 == 0) {
break;
}
System.out.println(item);
}

Как мы видим, метод takeWhile позволяет добиться именно того, что нам нужно.

Но что, если мы еще не приняли Java 9? Как мы можем добиться того же, используя Java 8?

3. Пользовательский сплитератор

Давайте создадим собственный Spliterator , который будет работать как декоратор для Stream.spliterator . Мы можем заставить этот Spliterator выполнять перерыв для нас.

Во-первых, мы получим Spliterator из нашего потока, затем украсим его нашим CustomSpliterator и обеспечим Predicate для управления операцией прерывания . Наконец, мы создадим новый поток из CustomSpliterator:

public static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<T> predicate) {
CustomSpliterator<T> customSpliterator = new CustomSpliterator<>(stream.spliterator(), predicate);
return StreamSupport.stream(customSpliterator, false);
}

Давайте посмотрим, как создать CustomSpliterator :

public class CustomSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

private Spliterator<T> splitr;
private Predicate<T> predicate;
private boolean isMatched = true;

public CustomSpliterator(Spliterator<T> splitr, Predicate<T> predicate) {
super(splitr.estimateSize(), 0);
this.splitr = splitr;
this.predicate = predicate;
}

@Override
public synchronized boolean tryAdvance(Consumer<? super T> consumer) {
boolean hadNext = splitr.tryAdvance(elem -> {
if (predicate.test(elem) && isMatched) {
consumer.accept(elem);
} else {
isMatched = false;
}
});
return hadNext && isMatched;
}
}

Итак, давайте взглянем на метод tryAdvance . Здесь мы видим, что пользовательский Spliterator обрабатывает элементы декорированного Spliterator . Обработка выполняется до тех пор, пока наш предикат соответствует и в начальном потоке все еще есть элементы. Когда любое из условий становится ложным , наш Spliterator «ломается» и операция потоковой передачи завершается.

Давайте протестируем наш новый вспомогательный метод:

@Test
public void whenCustomTakeWhileIsCalled_ThenCorrectItemsAreReturned() {
Stream<String> initialStream =
Stream.of("cat", "dog", "elephant", "fox", "rabbit", "duck");

List<String> result =
CustomTakeWhile.takeWhile(initialStream, x -> x.length() % 2 != 0)
.collect(Collectors.toList());

assertEquals(asList("cat", "dog"), result);
}

Как мы видим, поток остановился после того, как условие было выполнено. В целях тестирования мы собрали результаты в список, но мы также могли бы использовать вызов forEach или любую другую функцию Stream .

4. Кастом для каждого

Хотя предоставление Stream со встроенным механизмом прерывания может быть полезным, может быть проще сосредоточиться только на операции forEach .

Давайте воспользуемся Stream.spliterator напрямую без декоратора:

public class CustomForEach {

public static class Breaker {
private boolean shouldBreak = false;

public void stop() {
shouldBreak = true;
}

boolean get() {
return shouldBreak;
}
}

public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) {
Spliterator<T> spliterator = stream.spliterator();
boolean hadNext = true;
Breaker breaker = new Breaker();

while (hadNext && !breaker.get()) {
hadNext = spliterator.tryAdvance(elem -> {
consumer.accept(elem, breaker);
});
}
}
}

Как мы видим, новый пользовательский метод forEach вызывает BiConsumer , предоставляя нашему коду как следующий элемент, так и объект прерывателя, который он может использовать для остановки потока.

Давайте попробуем это в модульном тесте:

@Test
public void whenCustomForEachIsCalled_ThenCorrectItemsAreReturned() {
Stream<String> initialStream = Stream.of("cat", "dog", "elephant", "fox", "rabbit", "duck");
List<String> result = new ArrayList<>();

CustomForEach.forEach(initialStream, (elem, breaker) -> {
if (elem.length() % 2 == 0) {
breaker.stop();
} else {
result.add(elem);
}
});

assertEquals(asList("cat", "dog"), result);
}

5. Вывод

В этой статье мы рассмотрели способы предоставления эквивалента вызова break для потока. Мы увидели, как takeWhile в Java 9 решает большую часть проблем за нас и как предоставить его версию для Java 8.

Наконец, мы рассмотрели служебный метод, который может предоставить нам эквивалент операции прерывания при итерации Stream .

Как всегда, пример кода можно найти на GitHub .