- Scheduler
- See Also
- Language-Specific Information:
- RxClojure
- RxCpp
- RxGroovy
- Varieties of Scheduler
- Default Schedulers for RxGroovy Observable Operators
- Test Scheduler
- See Also
- RxJava 1․x
- Varieties of Scheduler
- Default Schedulers for RxJava 1.x Observable Operators
- Using Schedulers
- Recursive Schedulers
- Checking or Setting Unsubscribed Status
- Delayed and Periodic Schedulers
- Test Scheduler
- Основы реактивного программирования под Android на практическом примере
- 1. Введение в реактивное программирование
- 2. Подготовка окружения
- 3. Создаем базовый пример
- 4. Подключаем реактивность
- 5. Запускаем первое реактивное приложение
- 6. Наращиваем реактивность — использование операторов
- 7. Увеличиваем мощности
Scheduler
If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular .
Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.
By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn , on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.
See Also
Language-Specific Information:
RxClojure
RxCpp
RxGroovy
Varieties of Scheduler
You obtain a Scheduler from the factory methods described in the Schedulers class. The following table shows the varieties of Scheduler that are available to you by means of these methods in RxGroovy:
Scheduler | purpose |
---|---|
Schedulers.computation( ) | meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor) | uses the specified Executor as a Scheduler |
Schedulers.immediate( ) | schedules work to begin immediately in the current thread |
Schedulers.io( ) | meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ) ; Schedulers.io( ) by default is a CachedThreadScheduler , which is something like a new thread scheduler with thread caching |
Schedulers.newThread( ) | creates a new thread for each unit of work |
Schedulers.trampoline( ) | queues work to begin on the current thread after any already-queued work |
Default Schedulers for RxGroovy Observable Operators
Some Observable operators in RxGroovy have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:
operator | Scheduler |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
Test Scheduler
The TestScheduler allows you to exercise fine-tuned manual control over how the Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise arrangements of actions in time. This Scheduler has three additional methods:
advanceTimeTo(time,unit) advances the Scheduler’s clock to a particular point in time advanceTimeBy(time,unit) advances the Scheduler’s clock forward by a particular amount of time triggerActions( ) start any unstarted actions that have been scheduled for a time equal to or earlier than the present time according to the Scheduler’s clock
See Also
- Testing Reactive Applications by Ben Christensen
- RxJava Threading Examples by Graham Lea
- Advanced RxJava: Schedulers (part 1) (part 2) (part 3) (part 4) by Dávid Karnok
RxJava 1․x
Varieties of Scheduler
You obtain a Scheduler from the factory methods described in the Schedulers class. The following table shows the varieties of Scheduler that are available to you by means of these methods in RxJava:
Scheduler | purpose |
---|---|
Schedulers.computation( ) | meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor) | uses the specified Executor as a Scheduler |
Schedulers.immediate( ) | schedules work to begin immediately in the current thread |
Schedulers.io( ) | meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ) ; Schedulers.io( ) by default is a CachedThreadScheduler , which is something like a new thread scheduler with thread caching |
Schedulers.newThread( ) | creates a new thread for each unit of work |
Schedulers.trampoline( ) | queues work to begin on the current thread after any already-queued work |
Default Schedulers for RxJava 1.x Observable Operators
Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:
operator | Scheduler |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
Using Schedulers
Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to schedule your own work on Subscriptions. The following example uses the schedule method of the Scheduler.Worker class to schedule work on the newThread Scheduler:
Recursive Schedulers
To schedule recursive calls, you can use schedule and then schedule(this) on the Worker object:
Checking or Setting Unsubscribed Status
Objects of the Worker class implement the Subscription interface, with its isUnsubscribed and unsubscribe methods, so you can stop work when a subscription is cancelled, or you can cancel the subscription from within the scheduled task:
The Worker is also a Subscription and so you can (and should, eventually) call its unsubscribe method to signal that it can halt work and release resources:
Delayed and Periodic Schedulers
You can also use a version of schedule that delays your action on the given Scheduler until a certain timespan has passed. The following example schedules someAction to be performed on someScheduler after 500ms have passed according to that Scheduler’s clock:
Another Scheduler method allows you to schedule an action to take place at regular intervals. The following example schedules someAction to be performed on someScheduler after 500ms have passed, and then every 250ms thereafter:
Test Scheduler
The TestScheduler allows you to exercise fine-tuned manual control over how the Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise arrangements of actions in time. This Scheduler has three additional methods:
advanceTimeTo(time,unit) advances the Scheduler’s clock to a particular point in time advanceTimeBy(time,unit) advances the Scheduler’s clock forward by a particular amount of time triggerActions( ) start any unstarted actions that have been scheduled for a time equal to or earlier than the present time according to the Scheduler’s clock
Источник
Основы реактивного программирования под Android на практическом примере
1. Введение в реактивное программирование
Разрабатывая сложное приложение под Android со множеством сетевых соединений, взаимодействием с пользователем и анимацией — означает писать код, который полон вложенных обратных вызовов. И по мере развития проекта такой код становится не только громоздким и трудно понимаемым, но также сложным в развитии, поддержке и подвержен множеством трудноуловимым ошибкам.
ReactiveX или функциональное реактивное программирование предлагает альтернативный подход, который позволяет значительно сократить код приложения и создавать изящные понимаемые приложения для управления асинхронными задачами и событиями. В реактивном программировании потребитель реагирует на данные, как они придут и распространяет изменения события в зарегистрированных наблюдателях.
RxJava — реализация ReactiveX с открытым исходным кодом на Java. Базовыми строительными блоками реактивного кода являются Observables и Subscribers. Подробнее с базовой основой можно ознакомиться в статье Грокаем* RxJava, часть первая: основы.
RxAndroid — расширение к RxJava, которое позволяет планировщику запускать код в основном и дополнительных потоках Android приложения и обеспечивает передачу результатов из созданных дополнительных потоках в основное для агрегации и взаимодействия с интерфейсом пользователя.
С целью более полного понимания основных принципов реактивного программирования рассмотрим практический пример для платформы Android. И начнем с настройки окружения для разработки.
2. Подготовка окружения
Подключаем основные библиотеки и прописываем зависимости в секции dependencies<> конфигурационного файла buil.gradle:
Подключаем поддержку лямбда-выражений — используем новые возможности языка Java 8 на платформе Android N. Чтобы использовать возможности языка Java 8 также необходимо подключить и новый компилятор Jack, для чего добавьте в файл build.gradle:
Примечание: Jack поддерживается только в Android Studio 2.1 и также необходимо выполнить обновление до JDK 8.
При внесении изменений в конфигурационном файле gradle появляется предупреждение о необходимости синхронизировать проект и, чтобы применить все изменения нажмите на ссылку Sync Now вверху-справа.
3. Создаем базовый пример
В связи с тем, что применение RxAndroid в большинстве случаев связано с проектами с много-поточной обработкой сетевых соединений — рассмотрим простой пример обработки результатов парсинга сайта.
Для отображения результатов создадим простой layout:
Для парсинга создадим простой класс WebParsing с двумя методами getURLs и getTitle:
Метод getURLs просматривает содержимое сайта и возвращает список всех найденных ссылок, а метод getTitle возвращает Title сайта по ссылке.
4. Подключаем реактивность
Для того, чтобы использовать возможности RxAndroid на основе приведенных выше методов создадим два соответствующих Observables:
Первый Observable будет порождать список URL ссылок, найденных на сайте, второй будет порождать Title. Разберем пример перового метода подробно и построчно:
- Observable
- queryURLs(String url) — строка объявляет Observable метод, который принимает в виде входного параметра ссылку на сайт для парсинга и возвращает результат парсинга в виде списка ссылок
- с указанного сайта;
WebParsing webParsing = new WebParsing() — создает переменную для доступа к нашим функциям парсинга;
return Observable.create — создает Observable, возвращающего список ссылок;
new Observable.OnSubscribe - () — строка объявляет интерфейс OnSubscribe с одним методом (см. ниже), который вызовется при подписке;
public void call(Subscriber subscriber) — перегружает метод call, который будет вызываться после подписки Subscriber;
subscriber.onNext(webParsing.getURLs(url)) — вызывает метод onNext для передачи данных Subscriber всякий раз, когда порождаются данные. Этот метод принимает в качестве параметра объект, испускаемый Observable;
subscriber.onCompleted() — Observable вызывает метод onCompleted() после того, как вызывает onNext в последний раз, если не было обнаружено никаких ошибок;
subscribeOn(Schedulers.io()) — метод subscribeOn подписывает всех Observable выше по цепочке на планировщик Schedulers.io();
observeOn(AndroidSchedulers.mainThread()) — метод observeOn позволяет получить результат в основном потоке приложения.
5. Запускаем первое реактивное приложение
Итак, Observables созданы, реализуем простейший пример на основе первого выше метода, который будет выводить список ссылок сайта:
Обернем наш реализуемый пример в класс MainExample и вызовем в MainActivity:
6. Наращиваем реактивность — использование операторов
Observable может трансформировать выходные данные с помощью операторов и они могут быть использованы в промежутке между Observable и Subscriber для манипуляции с данными. Операторов в RxJava очень много, поэтому для начала рассмотрим наиболее востребованные.
И начнем с того, что избавимся от цикла в подписчике и заставим наблюдателя последовательно испускать данные полученного массива ссылок, и поможет в этом нам оператор from():
Выглядит не совсем красиво и немного запутанно, поэтому применим следующий оператор flatMap(), который принимает на вход данные, излучаемые одним Observable, и возвращает данные, излучаемые другим Observable, подменяя таким образом один Observable на другой:
На следующем шаге еще разгрузим наш Subscriber и воспользуемся оператором map(), через который можно преобразовывать один элемент данных в другой. Оператор map() также может преобразовывать данные и порождать данные необходимого нам типа, отличного от исходного. В нашем случае наблюдатель будет формировать список строк, а подписчик только выведет их на экран:
Основные возможности мы рассмотрели и сейчас пришло время воспользоваться лямбдами, чтобы упростить наш код:
Сравним конструкцию выше с получившимся кодом и ощутим мощь и простоту лямбда-выражений.
7. Увеличиваем мощности
На следующем шаге усложним нашу обработку и воспользуемся оператором flatMap(), чтобы подключить второй подготовленный метод queryTitle(), также возвращающий наблюдателя. Этот метод возвращает Title сайта по ссылке на сайт. Создадим пример, в котором будем формировать и выводить список заголовков сайтов по ссылкам, найденным на веб-странице, т.е. вместо полученного списка ссылок на сайты в предыдущем примере выведем заголовки (Title) этих сайтов:
или в сокращенном виде:
добавляем map() для формирования списка заголовков:
с помощью оператора filter() отфильтровываем пустые строки со значением null:
с помощью оператора take() возьмем только первые 7 заголовков:
Последний пример показал, что объединение множества методов плюс использование большого количества доступных операторов плюс лямбда-выражения и мы получаем буквально из нескольких строк мощный обработчик потоков различных данных.
Все примеры, приведенные в статье выложены здесь.
Источник