1. Введение
После введения в RxJava мы рассмотрим операторы фильтрации.
В частности, мы сосредоточимся на фильтрации, пропуске, временной фильтрации и некоторых более сложных операциях фильтрации.
2. Фильтрация
При работе с Observable
иногда бывает полезно выбрать только подмножество испускаемых элементов. Для этой цели RxJava предлагает различные возможности фильтрации .
Начнем с метода фильтра .
2.1. Оператор фильтра
_
Проще говоря, оператор filter фильтрует
Observable
, гарантируя, что испускаемые элементы соответствуют указанному условию , которое приходит в форме Predicate
.
Давайте посмотрим, как мы можем отфильтровать только нечетные значения из выданных:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.filter(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 3, 5, 7, 9);
2.2. Оператор взятия
_
При фильтрации с помощью take
логика приводит к выдаче первых n
элементов при игнорировании остальных элементов.
Давайте посмотрим, как мы можем отфильтровать исходный
Observable
и выдать только первые два элемента:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.take(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.3. Оператор takeWhile
_
При использовании takeWhile
отфильтрованный Observable
будет продолжать испускать элементы до тех пор, пока не встретит первый элемент, не соответствующий предикату.
Давайте посмотрим, как мы можем использовать takeWhile
с фильтрующим предикатом:
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.4. Оператор TakeFirst
_
Всякий раз, когда мы хотим выдать только первый элемент, соответствующий заданному условию, мы можем использовать takeFirst().
Давайте быстро посмотрим, как мы можем сгенерировать первый элемент, который больше 5:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeFirst(x -> x > 5);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
2.5. операторы first
и firstOrDefault
Аналогичного поведения можно добиться с помощью первого
API:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.first();
filteredObservable.subscribe(subscriber);
subscriber.assertValue(1);
Однако, если мы хотим указать значение по умолчанию, если элементы не выдаются, мы можем использовать f
irstOrDefault
:
Observable<Integer> sourceObservable = Observable.empty();
Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.6. Оператор TakeLast
_
Далее, если мы хотим выдать только последние n
элементов, испущенных Observable
, мы можем использовать takeLast
.
Давайте посмотрим, как можно сгенерировать только последние три элемента:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.takeLast(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(8, 9, 10);
Мы должны помнить, что это задерживает выпуск любого элемента из исходного Observable
до его завершения.
2.7. последний
и последний или по умолчанию
Если мы хотим выдать только последний элемент, кроме использования takeLast(1)
, мы можем использовать last
.
Это фильтрует Observable
, выдавая только последний элемент, который дополнительно проверяет предикат
фильтрации :
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.last(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(9);
В случае, если Observable
пуст, мы можем использовать lastOrDefault
, который фильтрует Observable
, испуская значение по умолчанию.
Значение по умолчанию также выдается, если используется оператор lastOrDefault
и нет элементов, проверяющих условие фильтрации:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable =
sourceObservable.lastOrDefault(-1, i -> i > 10);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.8. Операторы elementAt
и elementAtOrDefault
С помощью оператора elementAt
мы можем выбрать один элемент, испускаемый исходным Observable
, указав его индекс:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.elementAt(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
Однако elementAt
выдаст исключение IndexOutOfBoundException
, если указанный индекс превысит количество созданных элементов.
Чтобы избежать этой ситуации, можно использовать elementAtOrDefault
, который вернет значение по умолчанию, если индекс выйдет за пределы допустимого диапазона:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable
= sourceObservable.elementAtOrDefault(7, -1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.9. Оператор ofType
_
Всякий раз, когда Observable
испускает элементы Object
, их можно фильтровать на основе их типа.
Давайте посмотрим, как мы можем фильтровать только испускаемые элементы типа String :
Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();
Observable filteredObservable = sourceObservable.ofType(String.class);
filteredObservable.subscribe(subscriber);
subscriber.assertValues("two", "five");
3. Пропуск
С другой стороны, когда мы хотим отфильтровать или пропустить некоторые элементы , выдаваемые Observable
, RxJava предлагает несколько операторов в качестве аналога фильтров , которые мы обсуждали ранее.
Начнем с оператора skip , аналога
take
.
3.1. Оператор пропуска
_
Когда Observable
выдает последовательность элементов, можно отфильтровать или пропустить некоторые из первых выданных элементов с помощью skip
.
Например. давайте посмотрим, как можно пропустить первые четыре элемента:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skip(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(5, 6, 7, 8, 9, 10);
3.2. Оператор skipWhile
_
Всякий раз, когда мы хотим отфильтровать все первые значения, выдаваемые Observable
, которые не соответствуют предикату фильтрации, мы можем использовать оператор skipWhile
:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.skipWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(4, 5, 4, 3, 2, 1);
3.3. Оператор skipLast
_
Оператор skipLast
позволяет нам пропускать конечные элементы, испускаемые Observable
, принимая только те, которые были испущены до них.
При этом мы можем, например, пропустить последние пять элементов:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skipLast(5);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3, 4, 5);
3.4. отличные
и отдельные
операторыUntilChanged
Отдельный оператор возвращает Observable
, который испускает все элементы, испускаемые sourceObservable ,
которые
являются различными:
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinct();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
Однако, если мы хотим получить Observable
, который испускает все элементы, испускаемые sourceObservable
, которые отличаются от их непосредственного предшественника, мы можем использовать оператор differentUntilChanged :
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 1, 3, 1);
3.5. Оператор ignoreElements
_
Всякий раз, когда мы хотим игнорировать все элементы, испускаемые sourceObservable
, мы можем просто использовать ignoreElements:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();
ignoredObservable.subscribe(subscriber);
subscriber.assertNoValues();
4. Операторы временной фильтрации
При работе с наблюдаемой последовательностью ось времени неизвестна, но иногда может быть полезно получить своевременные данные из последовательности.
С этой целью RxJava предлагает несколько методов, которые позволяют нам работать с Observable
, используя также ось времени .
Прежде чем перейти к первому, давайте определим Observable
с синхронизацией , который будет испускать элемент каждую секунду:
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable = Observable
.just(1, 2, 3, 4, 5, 6)
.zipWith(Observable.interval(
0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);
TestScheduler — это специальный планировщик, который позволяет вручную переводить часы в
любом удобном для нас темпе.
4.1. образец
и дроссельПоследние
операторы
Пример
оператора фильтрует timedObservable , возвращая Observable
, который выдает самые последние элементы, выдаваемые этим API в течение временных интервалов периода .
``
Давайте посмотрим, как мы можем сэмплировать timedObservable
, фильтруя только последний испускаемый элемент каждые 2,5 секунды:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> sampledObservable = timedObservable
.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);
sampledObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(3, 5, 6);
Такого поведения можно добиться также с помощью оператора ThrottleLast .
4.2. ДроссельПервый
оператор _
Оператор дросселяFirst
отличается от дросселяLast/sample
, так как он создает первый элемент, созданный timedObservable
в каждом периоде выборки, а не последний созданный элемент.
Давайте посмотрим, как мы можем выдать первые элементы, используя период выборки 4 секунды:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.throttleFirst(4100L, TimeUnit.SECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 6);
4.3. операторы debounce
и ThrottleWithTimeout
С помощью оператора debounce
можно создать элемент только в том случае, если в течение определенного промежутка времени не был создан другой элемент.
Следовательно, если мы выберем временной интервал, превышающий временной интервал между испускаемыми элементами timedObservable
, он будет испускать только последний .
С другой стороны, если он меньше, он будет испускать все элементы, испускаемые timedObservable.
Давайте посмотрим, что происходит в первом сценарии:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValue(6);
Такого поведения также можно добиться с помощью ThrottleWithTimeout
.
4.4. Оператор тайм
- аута
Оператор тайм
-аута отражает исходный Observable
, но выдает ошибку уведомления, прерывая выпуск элементов, если исходный Observable
не выдает какие-либо элементы в течение указанного интервала времени.
Давайте посмотрим, что произойдет, если мы укажем тайм-аут в 500 миллисекунд для нашего timedObservable
:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);
5. Множественная наблюдаемая фильтрация
При работе с Observable
определенно можно решить, фильтровать или пропускать элементы на основе второго Observable
.
Прежде чем двигаться дальше, давайте определим delayedObservable
, который будет выдавать только 1 элемент через 3 секунды:
Observable<Integer> delayedObservable = Observable.just(1)
.delay(3, TimeUnit.SECONDS, testScheduler);
Начнем с оператора takeUntil
.
5.1. Оператор takeUntil
_
Оператор takeUntil
отбрасывает любой элемент, испускаемый исходным Observable
( timedObservable
) после того, как второй Observable
( delayedObservable
) испускает элемент или завершается:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.skipUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(4, 5, 6);
5.2. Оператор skipUntil
_
С другой стороны, skipUntil
отбрасывает любой элемент, испускаемый исходным Observable
( timedObservable
), пока второй Observable
( delayedObservable
) не выдает элемент:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.takeUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 2, 3);
6. Заключение
В этом обширном руководстве мы рассмотрели различные операторы фильтрации, доступные в RxJava, и предоставили простой пример каждого из них.
Как всегда, все примеры кода в этой статье можно найти на GitHub .