Перейти к основному содержимому

Фильтрация наблюдаемых в RxJava

· 9 мин. чтения

1. Введение

После введения в RxJava мы рассмотрим операторы фильтрации.

В частности, мы сосредоточимся на фильтрации, пропуске, временной фильтрации и некоторых более сложных операциях фильтрации.

2. Фильтрация

При работе с Observable иногда бывает полезно выбрать только подмножество испускаемых элементов. Для этой цели RxJava предлагает различные возможности фильтрации .

Начнем с метода фильтра .

2.1. Оператор фильтра _

Проще говоря, оператор filter фильтрует Observable , гарантируя, что испускаемые элементы соответствуют указанному условию , которое приходит в форме Predicate .

Давайте посмотрим, как мы можем отфильтровать только нечетные значения из выданных:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
.filter(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 3, 5, 7, 9);

2.2. Оператор взятия _

При фильтрации с помощью take логика приводит к выдаче первых n элементов при игнорировании остальных элементов.

Давайте посмотрим, как мы можем отфильтровать исходный Observable и выдать только первые два элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.take(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.3. Оператор takeWhile _

При использовании takeWhile отфильтрованный Observable будет продолжать испускать элементы до тех пор, пока не встретит первый элемент, не соответствующий предикату.

Давайте посмотрим, как мы можем использовать takeWhile с фильтрующим предикатом:

Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
.takeWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.4. Оператор TakeFirst _

Всякий раз, когда мы хотим выдать только первый элемент, соответствующий заданному условию, мы можем использовать takeFirst().

Давайте быстро посмотрим, как мы можем сгенерировать первый элемент, который больше 5:

Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
.takeFirst(x -> x > 5);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

2.5. операторы first и firstOrDefault

Аналогичного поведения можно добиться с помощью первого API:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.first();

filteredObservable.subscribe(subscriber);

subscriber.assertValue(1);

Однако, если мы хотим указать значение по умолчанию, если элементы не выдаются, мы можем использовать f irstOrDefault :

Observable<Integer> sourceObservable = Observable.empty();

Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.6. Оператор TakeLast _

Далее, если мы хотим выдать только последние n элементов, испущенных Observable , мы можем использовать takeLast .

Давайте посмотрим, как можно сгенерировать только последние три элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.takeLast(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(8, 9, 10);

Мы должны помнить, что это задерживает выпуск любого элемента из исходного Observable до его завершения.

2.7. последний и последний или по умолчанию

Если мы хотим выдать только последний элемент, кроме использования takeLast(1) , мы можем использовать last .

Это фильтрует Observable , выдавая только последний элемент, который дополнительно проверяет предикат фильтрации :

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
.last(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(9);

В случае, если Observable пуст, мы можем использовать lastOrDefault , который фильтрует Observable , испуская значение по умолчанию.

Значение по умолчанию также выдается, если используется оператор lastOrDefault и нет элементов, проверяющих условие фильтрации:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable =
sourceObservable.lastOrDefault(-1, i -> i > 10);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.8. Операторы elementAt и elementAtOrDefault

С помощью оператора elementAt мы можем выбрать один элемент, испускаемый исходным Observable , указав его индекс:

Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.elementAt(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

Однако elementAt выдаст исключение IndexOutOfBoundException , если указанный индекс превысит количество созданных элементов.

Чтобы избежать этой ситуации, можно использовать elementAtOrDefault , который вернет значение по умолчанию, если индекс выйдет за пределы допустимого диапазона:

Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable
= sourceObservable.elementAtOrDefault(7, -1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.9. Оператор ofType _

Всякий раз, когда Observable испускает элементы Object , их можно фильтровать на основе их типа.

Давайте посмотрим, как мы можем фильтровать только испускаемые элементы типа String :

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();

Observable filteredObservable = sourceObservable.ofType(String.class);

filteredObservable.subscribe(subscriber);

subscriber.assertValues("two", "five");

3. Пропуск

С другой стороны, когда мы хотим отфильтровать или пропустить некоторые элементы , выдаваемые Observable , RxJava предлагает несколько операторов в качестве аналога фильтров , которые мы обсуждали ранее.

Начнем с оператора skip , аналога take .

3.1. Оператор пропуска _

Когда Observable выдает последовательность элементов, можно отфильтровать или пропустить некоторые из первых выданных элементов с помощью skip .

Например. давайте посмотрим, как можно пропустить первые четыре элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skip(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. Оператор skipWhile _

Всякий раз, когда мы хотим отфильтровать все первые значения, выдаваемые Observable , которые не соответствуют предикату фильтрации, мы можем использовать оператор skipWhile :

Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
.skipWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. Оператор skipLast _

Оператор skipLast позволяет нам пропускать конечные элементы, испускаемые Observable , принимая только те, которые были испущены до них.

При этом мы можем, например, пропустить последние пять элементов:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skipLast(5);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5);

3.4. отличные и отдельные операторыUntilChanged

Отдельный оператор возвращает Observable , который испускает все элементы, испускаемые sourceObservable , которые являются различными:

Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinct();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

Однако, если мы хотим получить Observable , который испускает все элементы, испускаемые sourceObservable , которые отличаются от их непосредственного предшественника, мы можем использовать оператор differentUntilChanged :

Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 1, 3, 1);

3.5. Оператор ignoreElements _

Всякий раз, когда мы хотим игнорировать все элементы, испускаемые sourceObservable , мы можем просто использовать ignoreElements:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();

ignoredObservable.subscribe(subscriber);

subscriber.assertNoValues();

4. Операторы временной фильтрации

При работе с наблюдаемой последовательностью ось времени неизвестна, но иногда может быть полезно получить своевременные данные из последовательности.

С этой целью RxJava предлагает несколько методов, которые позволяют нам работать с Observable , используя также ось времени .

Прежде чем перейти к первому, давайте определим Observable с синхронизацией , который будет испускать элемент каждую секунду:

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable = Observable
.just(1, 2, 3, 4, 5, 6)
.zipWith(Observable.interval(
0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

TestScheduler — это специальный планировщик, который позволяет вручную переводить часы в любом удобном для нас темпе.

4.1. образец и дроссельПоследние операторы

Пример оператора фильтрует timedObservable , возвращая Observable , который выдает самые последние элементы, выдаваемые этим API в течение временных интервалов периода . ``

Давайте посмотрим, как мы можем сэмплировать timedObservable , фильтруя только последний испускаемый элемент каждые 2,5 секунды:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> sampledObservable = timedObservable
.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);

sampledObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(3, 5, 6);

Такого поведения можно добиться также с помощью оператора ThrottleLast .

4.2. ДроссельПервый оператор _

Оператор дросселяFirst отличается от дросселяLast/sample , так как он создает первый элемент, созданный timedObservable в каждом периоде выборки, а не последний созданный элемент.

Давайте посмотрим, как мы можем выдать первые элементы, используя период выборки 4 секунды:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
.throttleFirst(4100L, TimeUnit.SECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 6);

4.3. операторы debounce и ThrottleWithTimeout

С помощью оператора debounce можно создать элемент только в том случае, если в течение определенного промежутка времени не был создан другой элемент.

Следовательно, если мы выберем временной интервал, превышающий временной интервал между испускаемыми элементами timedObservable , он будет испускать только последний . С другой стороны, если он меньше, он будет испускать все элементы, испускаемые timedObservable.

Давайте посмотрим, что происходит в первом сценарии:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValue(6);

Такого поведения также можно добиться с помощью ThrottleWithTimeout .

4.4. Оператор тайм - аута

Оператор тайм -аута отражает исходный Observable , но выдает ошибку уведомления, прерывая выпуск элементов, если исходный Observable не выдает какие-либо элементы в течение указанного интервала времени.

Давайте посмотрим, что произойдет, если мы укажем тайм-аут в 500 миллисекунд для нашего timedObservable :

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Множественная наблюдаемая фильтрация

При работе с Observable определенно можно решить, фильтровать или пропускать элементы на основе второго Observable .

Прежде чем двигаться дальше, давайте определим delayedObservable , который будет выдавать только 1 элемент через 3 секунды:

Observable<Integer> delayedObservable = Observable.just(1)
.delay(3, TimeUnit.SECONDS, testScheduler);

Начнем с оператора takeUntil .

5.1. Оператор takeUntil _

Оператор takeUntil отбрасывает любой элемент, испускаемый исходным Observable ( timedObservable ) после того, как второй Observable ( delayedObservable ) испускает элемент или завершается:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
.skipUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(4, 5, 6);

5.2. Оператор skipUntil _

С другой стороны, skipUntil отбрасывает любой элемент, испускаемый исходным Observable ( timedObservable ), пока второй Observable ( delayedObservable ) не выдает элемент:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
.takeUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 2, 3);

6. Заключение

В этом обширном руководстве мы рассмотрели различные операторы фильтрации, доступные в RxJava, и предоставили простой пример каждого из них.

Как всегда, все примеры кода в этой статье можно найти на GitHub .