1. Обзор
В этой статье мы сосредоточимся на различных типах планировщиков
, которые будем использовать при написании многопоточных программ на основе методов RxJava Observable
subscribeOn
иObservable . ``
Планировщики
дают возможность указать, где и, вероятно, когда выполнять задачи, связанные с работой цепочки Observable .
Мы можем получить Scheduler
из фабричных методов, описанных в классе Schedulers .
2. Поведение потоков по умолчанию
По умолчанию Rx является однопоточным , что означает, что Observable
и цепочка операторов, которые мы можем применить к нему, будут уведомлять своих наблюдателей в том же потоке, в котором вызывается его метод subscribe() .
МетодыObservOn и SubscribeOn
принимают
в качестве аргумента Планировщик,
который, как следует из названия, представляет собой инструмент, который мы можем использовать для планирования отдельных действий.
Мы создадим нашу реализацию Scheduler
, используя метод create
Worker
, который возвращает Scheduler.Worker .
Рабочий процесс
принимает действия и выполняет их последовательно в одном потоке.
В некотором смысле worker
сам
по себе является планировщиком, но мы не будем называть его планировщиком
, чтобы избежать путаницы.
2.1. Планирование действия
Мы можем запланировать задание в любом планировщике
, создав нового работника
и запланировав некоторые действия:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
Assert.assertTrue(result.equals("action"));
Затем действие ставится в очередь в потоке, которому назначен рабочий процесс.
2.2. Отмена действия
Scheduler.Worker
расширяет подписку
. Вызов метода отмены подписки для
рабочего
процесса приведет к очистке очереди и отмене всех ожидающих выполнения задач. Мы можем видеть это на примере:
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += "First_Action";
worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
Assert.assertTrue(result.equals("First_Action"));
Вторая задача никогда не выполняется, потому что предыдущая отменила всю операцию. Действия, которые находились в процессе выполнения, будут прерваны.
3. Schedulers.newThread
Этот планировщик просто запускает новый поток каждый раз, когда его запрашивают через subscribeOn()
илиObservOn ()
.
Вряд ли это хороший выбор не только из-за задержки при запуске потока, но и из-за того, что этот поток не используется повторно:
Observable.just("Hello")
.observeOn(Schedulers.newThread())
.doOnNext(s ->
result2 += Thread.currentThread().getName()
)
.observeOn(Schedulers.newThread())
.subscribe(s ->
result1 += Thread.currentThread().getName()
);
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));
Когда Worker
завершается, поток просто завершается. Этот Планировщик
можно использовать только тогда, когда задачи являются крупнозернистыми: на выполнение уходит много времени, но их очень мало, так что повторное использование потоков вряд ли вообще возможно.
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
"RxNewThreadScheduler-1_Start_End_worker_"));
Когда мы запланировали рабочий
процесс в NewThreadScheduler,
мы увидели, что рабочий процесс привязан к определенному потоку.
4. Планировщики.немедленно
Schedulers.immediate
— это специальный планировщик, который вызывает задачу в клиентском потоке блокирующим, а не асинхронным способом и возвращает результат, когда действие завершено:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
"main_Start_worker__End"));
На самом деле, подписка на Observable
через немедленный планировщик
обычно имеет тот же эффект, что и отсутствие подписки на какой-либо конкретный планировщик
:
Observable.just("Hello")
.subscribeOn(Schedulers.immediate())
.subscribe(s ->
result += Thread.currentThread().getName()
);
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));
5. Планировщики.батут
Планировщик трамплина
очень похож на немедленный , потому что он также
планирует
задачи в том же потоке, эффективно блокируя их. ``
Однако предстоящая задача выполняется после завершения всех ранее запланированных задач:
Observable.just(2, 4, 6, 8)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));
Immediate сразу же
вызывает заданную задачу, в то время как trampoline
ожидает завершения текущей задачи.
Рабочий процесс
батута
выполняет каждую задачу в потоке, запланировавшем первую задачу. Первый вызов schedule
блокируется до тех пор, пока очередь не будет очищена:
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "Start";
worker.schedule(() -> {
result += "_middleStart";
worker.schedule(() ->
result += "_worker_"
);
result += "_middleEnd";
});
result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));
6. Планировщики.от
Планировщики
внутренне сложнее, чем исполнители
из java.util.concurrent
, поэтому требовалась отдельная абстракция.
Но поскольку они концептуально очень похожи, неудивительно, что существует оболочка, которая может превратить Executor
в Scheduler
, используя метод from
factory:
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern)
.build();
}
@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements()
throws InterruptedException {
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(
10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Alfa");
subscriber.onNext("Beta");
subscriber.onCompleted();
});;
observable
.subscribeOn(schedulerA)
.subscribeOn(schedulerB)
.subscribe(
x -> result += Thread.currentThread().getName() + x + "_",
Throwable::printStackTrace,
() -> result += "_Completed"
);
Thread.sleep(2000);
Assert.assertTrue(result.equals(
"Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}
SchedulerB
используется в течение короткого промежутка времени, но едва ли запланирует новое действие в schedulerA
, который выполняет всю работу. Таким образом, несколько методов subscribeOn
не только игнорируются, но и создают небольшие накладные расходы.
7. Планировщики.io
Этот планировщик
похож на newThread,
за исключением того факта, что уже запущенные потоки перезапускаются и, возможно, могут обрабатывать будущие запросы.
Эта реализация работает аналогично ThreadPoolExecutor
из java.util.concurrent
с неограниченным пулом потоков. Каждый раз, когда запрашивается новый воркер
, либо запускается новый поток (и позже какое-то время простаивает), либо повторно используется простаивающий:
Observable.just("io")
.subscribeOn(Schedulers.io())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxIoScheduler-2"));
Мы должны быть осторожны с неограниченными ресурсами любого рода — в случае медленных или не отвечающих внешних зависимостей, таких как веб-службы, планировщик ввода -
вывода
может запустить огромное количество потоков, что приведет к тому, что наше собственное приложение перестанет отвечать.
На практике использование Schedulers.io
почти всегда является лучшим выбором.
8. Планировщики.вычисления
`
Планировщик вычислений
по умолчанию ограничивает количество параллельно работающих потоков значением
availableProcessors() , как это указано в служебном классе Runtime.getRuntime
()` .
Таким образом, мы должны использовать планировщик вычислений,
когда задачи полностью связаны с процессором; то есть они требуют вычислительной мощности и не имеют блокирующего кода.
Он использует неограниченную очередь перед каждым потоком, поэтому, если задача запланирована, но все ядра заняты, она будет поставлена в очередь. Однако очередь непосредственно перед каждым потоком будет расти:
Observable.just("computation")
.subscribeOn(Schedulers.computation())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));
Если по какой-то причине нам нужно количество потоков, отличное от значения по умолчанию, мы всегда можем использовать системное свойство rx.scheduler.max-computation-threads
.
Выполняя меньше потоков, мы можем гарантировать, что одно или несколько ядер ЦП всегда простаивают, и даже при большой нагрузке пул вычислительных
потоков не перегружает сервер. Просто невозможно иметь больше вычислительных потоков, чем ядер.
9. Планировщики.тест
Этот планировщик
используется только в целях тестирования, и мы никогда не увидим его в рабочем коде. Его главным преимуществом является возможность опережать часы, имитируя прохождение времени произвольно:
List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable
.interval(1, TimeUnit.SECONDS, scheduler);
Observable.from(letters)
.zipWith(tick, (string, index) -> index + "-" + string)
.subscribeOn(scheduler)
.subscribe(subscriber);
subscriber.assertNoValues();
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
subscriber.getOnNextEvents(),
hasItems("0-A", "1-B", "2-C"));
10. Планировщики по умолчанию
Некоторые операторы Observable
в RxJava имеют альтернативные формы, которые позволяют нам установить планировщик
, который оператор будет использовать для своей работы. Другие не работают с каким-либо конкретным планировщиком или не работают с конкретным
планировщиком
по умолчанию .
Например, оператор задержки
принимает восходящие события и отправляет их вниз по течению через заданное время. Очевидно, что он не может удерживать исходный поток в течение этого периода, поэтому он должен использовать другой планировщик
:
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
.delay(1, TimeUnit.SECONDS, schedulerA)
.subscribe(i -> result+= Thread.currentThread().getName() + i + " ");
Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));
Без предоставления пользовательского schedulerA
все операторы ниже delay
будут использовать планировщик вычислений
.
Другими важными операторами, поддерживающими настраиваемые планировщики
, являются buffer,
interval
, range
, timer
, skip
, take
, timeout
и некоторые другие. Если мы не предоставляем планировщик
таким операторам, используется планировщик вычислений
, который в большинстве случаев является безопасным по умолчанию.
11. Заключение
В действительно реактивных приложениях, для которых все длительные операции являются асинхронными, требуется очень мало потоков и, следовательно, планировщиков .
Освоение планировщиков необходимо для написания масштабируемого и безопасного кода с использованием RxJava. Разница между subscribeOn
иObservOn особенно
важна при высокой нагрузке, когда каждая задача должна выполняться именно тогда, когда мы ожидаем.
И последнее, но не менее важное: мы должны быть уверены, что планировщики
, используемые ниже по течению, могут справиться с нагрузкой , создаваемой планировщиками
выше по течению . Для получения дополнительной информации есть эта статья о обратном давлении .
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.