Перейти к основному содержимому

RxJava и обработка ошибок

· 6 мин. чтения

1. Введение

В этой статье мы рассмотрим, как обрабатывать исключения и ошибки с помощью RxJava.

Во-первых, имейте в виду, что Observable обычно не генерирует исключений. Вместо этого по умолчанию Observable вызывает метод onError() своего наблюдателя , уведомляя наблюдателя о том, что только что произошла неисправимая ошибка, а затем завершает работу, не вызывая больше никаких методов своего наблюдателя .

Операторы обработки ошибок, которые мы собираемся представить, изменяют поведение по умолчанию, возобновляя или повторяя последовательность Observable .

2. Зависимости Maven

Во-первых, давайте добавим RxJava в pom.xml :

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.3</version>
</dependency>

Последнюю версию артефакта можно найти здесь .

3. Обработка ошибок

Когда возникает ошибка, нам обычно нужно как-то ее обработать. Например, измените связанные внешние состояния, возобновив последовательность с результатами по умолчанию, или просто оставьте все как есть, чтобы ошибка могла распространяться.

3.1. Действия при ошибке

С помощью doOnError мы можем вызвать любое действие, которое необходимо при возникновении ошибки:

@Test
public void whenChangeStateOnError_thenErrorThrown() {
TestObserver testObserver = new TestObserver();
AtomicBoolean state = new AtomicBoolean(false);
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);

testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();

assertTrue("state should be changed", state.get());
}

В случае возникновения исключения при выполнении действия RxJava оборачивает исключение в CompositeException :

@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);

testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}

3.2. Возобновить с элементами по умолчанию

Хотя мы можем вызывать действия с помощью doOnError , ошибка все равно нарушает стандартный поток последовательности. Иногда мы хотим возобновить последовательность с опцией по умолчанию, что и делает onErrorReturnItem :

@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}

Если предпочтительнее динамический поставщик элементов по умолчанию, мы можем использовать onErrorReturn :

@Test
public void whenHandleOnErrorReturn_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}

3.3. Возобновить с другой последовательностью

Вместо того, чтобы возвращаться к одному элементу, мы можем предоставить резервную последовательность данных, используя onErrorResumeNext при обнаружении ошибок. Это поможет предотвратить распространение ошибки:

@Test
public void whenHandleOnErrorResume_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}

Если резервная последовательность отличается в зависимости от конкретных типов исключений или последовательность должна быть сгенерирована функцией, мы можем передать функцию в onErrorResumeNext:

@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> Observable
.just(throwable.getMessage(), "nextValue"))
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}

3.4. Обрабатывать только исключения

RxJava также предоставляет резервный метод, который позволяет продолжить последовательность с предоставленным Observable , когда возникает исключение (но не ошибка):

@Test
public void whenHandleOnException_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}

@Test
public void whenHandleOnException_thenNotResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);

testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}

Как видно из приведенного выше кода, при возникновении ошибки onExceptionResumeNext не срабатывает для возобновления последовательности.

4. Повторите попытку при ошибке

Нормальная последовательность может быть нарушена временным сбоем системы или ошибкой серверной части. В этих ситуациях мы хотим повторить попытку и подождать, пока последовательность не будет исправлена.

К счастью, RxJava дает нам возможность сделать именно это.

4.1. Повторить попытку

При использовании retry Observable будет переподписываться бесконечное количество раз до тех пор, пока не возникнет ошибка . Но в большинстве случаев мы бы предпочли фиксированное количество повторных попыток: ``

@Test
public void whenRetryOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);

testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should try twice", atomicCounter.get() == 2);
}

4.2. Повторить при условии

Условный повтор также возможен в RxJava, используя повтор с предикатами или используя retryUntil :

@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);

testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}

@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}

4.3. Повторить когда

Помимо этих основных опций, есть также интересный метод повторной попытки: retryWhen .

Это возвращает Observable, скажем , «NewO», который выдает те же значения, что и исходный ObservableSource , скажем, «OldO», но если возвращаемый Observable «NewO» вызывает onComplete или onError , то onComplete или onError подписчика будут вызваны.

И если «NewO» выдает какой-либо элемент, будет активирована повторная подписка на исходный ObservableSource «OldO» .

Приведенные ниже тесты показывают, как это работает:

@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
Exception noretryException = new Exception("don't retry");
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.error(noretryException))
.subscribe(testObserver);

testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}

@Test
public void whenRetryWhenOnError_thenCompleted() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get()==0);
}

@Test
public void whenRetryWhenOnError_thenResubscribed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get()==1);
}

Типичное использование retryWhen ограничено повторными попытками с переменной задержкой:

@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver testObserver = new TestObserver();
long before = System.currentTimeMillis();
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
.blockingSubscribe(testObserver);

testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before)/1000;
assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}

Обратите внимание, как эта логика повторяет три попытки и постепенно откладывает каждую повторную попытку.

5. Резюме

В этой статье мы представили несколько способов обработки ошибок и исключений в RxJava.

Есть также несколько специфичных для RxJava исключений, связанных с обработкой ошибок — более подробную информацию можно найти на официальной вики .

Как всегда, полную реализацию можно найти на Github .