1. Обзор
В этой статье мы рассмотрим, как библиотека RxJava помогает нам справляться с противодавлением.
Проще говоря, RxJava использует концепцию реактивных потоков, вводя Observables,
на которые может подписаться один или несколько наблюдателей .
Работа с потенциально бесконечными потоками очень сложна, так как нам нужно столкнуться с проблемой противодавления.
Несложно попасть в ситуацию, когда Observable
выдает элементы быстрее, чем подписчик может их потреблять. Мы рассмотрим различные решения проблемы растущего буфера неиспользованных предметов.
2. Горячие наблюдаемые
против холодных наблюдаемых
Во-первых, давайте создадим простую потребительскую функцию, которая будет использоваться как потребитель элементов из Observables
, которые мы определим позже:
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Наша функция calculate()
просто печатает аргумент. Здесь важно отметить, что вызывается метод Thread.sleep(1000)
— мы делаем это, чтобы эмулировать какую-то длительную задачу, которая заставит Observable
заполняться элементами быстрее, чем Observer
может их использовать.
У нас есть два типа Observables — Hot
и Cold
— которые совершенно разные, когда дело доходит до обработки противодавления.
2.1. Холодные наблюдаемые
Холодный Observable
испускает определенную последовательность элементов, но может начать испускать эту последовательность, когда его Наблюдатель
сочтет это удобным, и с любой скоростью, которую желает Наблюдатель
, не нарушая целостность последовательности. Cold Observable
предоставляет предметы ленивым способом.
Наблюдатель берет элементы только тогда, когда он готов обработать этот элемент, и элементы не нужно буферизовать в Observable , поскольку они запрашиваются
методом
извлечения. ``
Например, если вы создаете Observable
на основе статического диапазона элементов от одного до миллиона, этот Observable
будет выдавать одну и ту же последовательность элементов независимо от того, как часто эти элементы наблюдаются:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute);
Когда мы запускаем нашу программу, элементы будут вычисляться Observer
лениво и запрашиваться в режиме pull. Метод Schedulers.computation()
означает, что мы хотим запустить наш Observer
в пуле потоков вычислений в RxJava.
Вывод программы будет состоять из результата метода calculate()
, вызываемого для одного за другим элемента из Observable
:
compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...
Холодные наблюдаемые объекты
не нуждаются в какой-либо форме противодавления, потому что они работают по принципу вытягивания. Примеры элементов, испускаемых холодным Observable
, могут включать результаты запроса к базе данных, поиска файлов или веб-запроса.
2.2. Горячие наблюдаемые
Горячий Observable
начинает генерировать элементы и испускает их сразу же после их создания. Это противоречит вытягивающей модели обработки Cold Observables .
Hot Observable
испускает предметы в своем собственном темпе, и его наблюдатели должны не отставать.
Когда Observer
не может потреблять элементы так же быстро, как они создаются Observable
, их необходимо буферизовать или обрабатывать каким-либо другим способом, поскольку они будут заполнять память, что в конечном итоге приведет к OutOfMemoryException.
Давайте рассмотрим пример горячего Observable,
который производит 1 миллион элементов для конечного потребителя, который обрабатывает эти элементы. Когда методу calculate() в
Observer
требуется некоторое время для обработки каждого элемента, Observable
начинает заполнять память элементами, что приводит к сбою программы:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
IntStream.range(1, 1_000_000).forEach(source::onNext);
Запуск этой программы завершится с ошибкой MissingBackpressureException
, потому что мы не определили способ обработки перепроизводства Observable
.
Примеры элементов, испускаемых горячим Observable
, могут включать события мыши и клавиатуры, системные события или курсы акций.
3. Буферизация с перепроизводством Observable
Первый способ справиться с перепроизводством Observable
— определить своего рода буфер для элементов, которые не могут быть обработаны Observer.
Мы можем сделать это, вызвав метод buffer() :
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Определение буфера размером 1024 даст наблюдателю
некоторое время, чтобы догнать источник с перепроизводством. Буфер будет хранить элементы, которые еще не были обработаны.
Мы можем увеличить размер буфера, чтобы было достаточно места для производимых значений.
Обратите внимание, однако, что обычно это может быть только временным исправлением , поскольку переполнение все еще может произойти, если источник превышает прогнозируемый размер буфера.
4. Пакетирование выданных элементов
Мы можем группировать перепроизводимые элементы в окнах из N элементов.
Когда Observable создает
элементы быстрее, чем Observer
может их обработать, мы можем облегчить эту ситуацию, сгруппировав созданные элементы вместе и отправив пакет элементов в Observer
, который может обрабатывать набор элементов, а не элемент один за другим:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.window(500)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Использование метода window()
с аргументом 500
укажет Observable
сгруппировать элементы в пакеты размером 500. Этот метод может уменьшить проблему перепроизводства Observable
, когда Observer
может обрабатывать пакет элементов быстрее, чем обработка элементов один за другим.
5. Пропуск элементов
Если некоторые из значений, выдаваемых Observable
, можно безопасно игнорировать, мы можем использовать выборку в течение определенного времени и дросселирующие операторы.
Методы sample()
и ThrottleFirst()
принимают продолжительность в качестве параметра:
- Метод
sample()
периодически просматривает последовательность элементов и выдает последний элемент, созданный в течение времени, указанного в качестве параметра. - Метод
ThrottleFirst()
выдает первый элемент, который был создан после продолжительности, указанной в качестве параметра.
Длительность — это время, по истечении которого из последовательности созданных элементов выбирается один конкретный элемент. Мы можем указать стратегию обработки противодавления, пропустив элементы:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.sample(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Мы указали, что стратегией пропуска элементов будет метод sample() .
Нам нужен образец последовательности продолжительностью 100 миллисекунд. Этот элемент будет передан наблюдателю.
Помните, однако, что эти операторы только снижают скорость получения значения нижестоящим наблюдателем
и, таким образом, они все еще могут приводить к MissingBackpressureException
.
6. Обработка заполнения наблюдаемого
буфера
В случае, если наши стратегии выборки или пакетирования элементов не помогают с заполнением буфера ,
нам необходимо реализовать стратегию обработки случаев, когда буфер заполняется.
Нам нужно использовать метод onBackpressureBuffer()
, чтобы предотвратить исключение BufferOverflowException.
Метод onBackpressureBuffer()
принимает три аргумента: емкость буфера Observable
, метод, который вызывается при заполнении буфера, и стратегию обработки элементов, которые необходимо удалить из буфера. Стратегии переполнения находятся в классе BackpressureOverflow
.
Существует 4 типа действий, которые могут быть выполнены при заполнении буфера:
ON_OVERFLOW_ERROR —
это поведение по умолчанию, сигнализирующее обисключении BufferOverflowException
, когда буфер заполнен.ON_OVERFLOW_DEFAULT —
в настоящее время это то же самое, что иON_OVERFLOW_ERROR.
ON_OVERFLOW_DROP_LATEST
— если произойдет переполнение, текущее значение будет просто проигнорировано, и только старые значения будут доставлены после запроса нижестоящегоObserver .
ON_OVERFLOW_DROP_OLDEST
— удаляет самый старый элемент в буфере и добавляет к нему текущее значение
Давайте посмотрим, как указать эту стратегию:
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> {}, Throwable::printStackTrace);
Здесь наша стратегия обработки переполнения буфера заключается в удалении самого старого элемента в буфере и добавлении нового элемента, созданного Observable
.
Обратите внимание, что последние две стратегии вызывают разрыв в потоке, поскольку они пропускают элементы. Кроме того, они не будут сигнализировать об исключении BufferOverflowException
.
7. Удаление всех лишних элементов
Всякий раз, когда нижестоящий наблюдатель
не готов принять элемент, мы можем использовать метод onBackpressureDrop()
, чтобы удалить этот элемент из последовательности.
Мы можем думать об этом методе как о методе onBackpressureBuffer()
с емкостью буфера, установленной на ноль, со стратегией ON_OVERFLOW_DROP_LATEST.
Этот оператор полезен, когда мы можем безопасно игнорировать значения из исходного Observable
(например, движения мыши или текущие сигналы местоположения GPS), поскольку позже будут более актуальные значения:
Observable.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.doOnNext(ComputeFunction::compute)
.subscribe(v -> {}, Throwable::printStackTrace);
Метод onBackpressureDrop()
устраняет проблему перепроизводства Observable
, но его следует использовать с осторожностью.
8. Заключение
В этой статье мы рассмотрели проблему перепроизводства Observable
и способы борьбы с противодавлением. Мы рассмотрели стратегии буферизации, группирования и пропуска элементов, когда Observer
не может потреблять элементы так же быстро, как они создаются Observable.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.