- Can’t use AndroidSchedulers.mainThread() after migrating to RxJava2? #357
- Comments
- AruLNadhaN commented Jan 1, 2017
- artem-zinnatullin commented Jan 1, 2017
- AruLNadhaN commented Jan 1, 2017
- akarnokd commented Jan 1, 2017
- JakeWharton commented Jan 1, 2017
- Buggyy commented May 25, 2017
- ronanhardiman commented Jul 14, 2017
- beblank commented Jul 14, 2017 •
- sarathraj-coder commented Aug 24, 2017
- tir38 commented Sep 19, 2017
- abiemann commented Nov 21, 2017
- aakashbhusal7 commented Nov 28, 2017
- Виды Schedulers в RxJava2.0
- Schedulers.io()
- Schedulers.computation()
- Schedulers.newThread()
- Schedulers.single()
- Schedulers.trampoline()
- AndroidSchedulers.mainThread()
- ExecutorService
- Методы Schedulers.start() и Schedulers.shutdown()
- Многопоточное программирование в Android с использованием RxJava 2
- Почему реактивное программирование?
- Никаких больше обратных вызовов
- Простой контроль ошибок
- Очень простое использование многопоточности
- RxJava НЕ многопоточна по умолчанию
- Простой пример
- Подружимся с планировщиками (Schedulers)
- Schedulers.io()
- Schedulers.computation()
- Schedulers.newThread()
- Schedulers.single()
- Schedulers.from(Executor executor)
- AndroidSchedulers.mainThread()
- Понимание subscribeOn() и observeOn()
- subscribeOn()
- Под капотом
- observeOn()
- Под капотом
- Резюме
Can’t use AndroidSchedulers.mainThread() after migrating to RxJava2? #357
Comments
AruLNadhaN commented Jan 1, 2017
After migrating to RxJava2. observeOn doesn’t accept AndroidSchedulers .
Is there a alternate in Schedulers for AndroidSchedulers.mainThread() or how should i solve this issue ?
OLD versions
RxAndroid — 1.2.1
RxJava — 1.1.6
Migrated Versions
RxAndroid — 2.0.1
RxJava — 2.0.3
The text was updated successfully, but these errors were encountered:
artem-zinnatullin commented Jan 1, 2017
This has nothing to do with order. Just use 2.x branch of RxAndroid https://github.com/ReactiveX/RxAndroid/blob/2.x/README.md#binaries
AruLNadhaN commented Jan 1, 2017
I’m already using 2.x branch. These are my dependencies.
akarnokd commented Jan 1, 2017
Something else could still import RxJava 1 and you use that API. RxJava 1 and 2 are not binary compatible and v2 is not a drop-in replacement. You have to update your imports and change certain class and method names. See the wiki for futher details.
JakeWharton commented Jan 1, 2017
Yep. You’re importing the wrong class as Artem said. Correct the import or use the fully qualified name to point at the 2.x version of this class.
Buggyy commented May 25, 2017
compile ‘io.reactivex:rxandroid:1.2.1’
compile ‘io.reactivex:rxjava:1.1.6’
ronanhardiman commented Jul 14, 2017
the same error:
compile ‘io.reactivex.rxjava2:rxjava:2.1.1’ compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’
try
compile ‘io.reactivex.rxjava2:rxjava:2.1.0
beblank commented Jul 14, 2017 •
same error
compile «io.reactivex.rxjava2:rxjava:2.1.1»
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’
bugs gone
use import io.reactivex.Observable
sarathraj-coder commented Aug 24, 2017
This is working
// RX
rxjava2_version = ‘2.0.1’
compile «io.reactivex.rxjava2:rxjava:$rxjava2_version»
compile «io.reactivex.rxjava2:rxandroid:$rxjava2_version»
and sample code is
import statements are
tir38 commented Sep 19, 2017
Be careful with
RxJava2 and RxAndroid(2) don’t always deploy updates in-sync (e.g. right now latest RxJava is 2.1.3 but latest RxAndroid is 2.0.1)
abiemann commented Nov 21, 2017
replace
import rx.Observable;
with
import io.reactivex.Observable;
aakashbhusal7 commented Nov 28, 2017
its not working..i get the error: cannot resolve method subscribeOn(rx.scheduler)
import com.example.abhus.listingapp.asset.JsonService;
import com.example.abhus.listingapp.model.Post;
import com.example.abhus.listingapp.views.DetailedActivity;
import rx.Observer;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
public class DetailPresenter <
DetailedActivity mView;
JsonService mForum;
Источник
Виды Schedulers в RxJava2.0
Безусловно, одним из преимуществ использования RxJava в Android – это легкий способ менять потоки выполнения для различных задач, занимающих много времени. Такие задачи как: сетевой запрос к серверу, парсинг JSON, маппинг данных из VO в DTO, сохранение данных в БД или на диск довольно ресурсоёмки и занимают много времени, и, чтобы сделать приложение более отзывчивым и приятным для пользователя, необходимо выносить всю “тяжёлую” работу для обработки в отдельный поток. Для этого в RxJava есть обширный набор, так называемых Шедулеров (Schedulers), которые позволяют обрабатывать данные в других потоках. Как ими пользоваться и какие между ними отличия – поговорим в этой статье.
Schedulers.io()
Schedulers.io() – наиболее распространенный вид scheduler’a. Этот тип используется для сетевых запросов, операций чтения данных из файла, записи или чтения данных из БД. Такой планировщик (scheduler) создаёт необходимое количество потоков для эффективного выполнения вашей задачи, удаляет ненужные, и старается переиспользовать потоки, которые могут понадобится. Важно осозновать, что количество созданных потоков не ограничено, соответственно есть опасность создания большого количества потоков и нехватки памяти. Неиспользуемый threads (потоки) очищаются после 60 секунд неактивности.
Если вы хотите научиться использовать продвинутые операторы RxJava, научиться объединять запросы и обрабатывать ошибки, то приглашаю вас на следующий поток интенсива по Android-разработке на Kotlin. Мы рассмотрим продвинутые операторы: zip, merge, concat, combineLatest, concatMap и другие, научимся использовать Subject и обуздаем Backpressure. Подробнее 👈
Schedulers.computation()
Schedulers.computation() – этот вид планировщика используется для математичсеких вычислений.Такой тип планировщика удобен тем, что создает количество потоков, соответствующее числу ядер в процессоре для параллельной обработки.Если вы не знаете какой тип планировщика вам подходит, то можете использовать Schedulers.computation() в качестве дефолтного. Многие операторы RxJava используют по умолчанию как раз Schedulers.computation()
Schedulers.newThread()
Schedulers.newThread() – создаёт новый поток для каждого активного Observable, не пытаясь переиспользовать ранее созданные потоки. Важно понимать, что в сложных задачах количество созданных threads не ограничено, и можно “уронить” приложение из-за нехватки памяти.
Schedulers.single()
Schedulers.single() – такой планировщик можно рассматривать, если вам необходимо выполнить всю работу последовательно в одном потоке. Такой тип планировщика может быть полезен если необходимо выполнить потоконебезопасный код в отдельном потоке.
Schedulers.trampoline()
Schedulers.trampoline() – выполняет код в текущем потоке, используется для последовательного выполнения задач
AndroidSchedulers.mainThread()
AndroidSchedulers.mainThread() – является частью rxAndroid и используется для выполнения задач в UI-потоке, например для модификации UI. Обычно используется вместе с observeOn
ExecutorService
Если ни один из вышеперечисленных видов планировщиков не подошел, то вы можете использовать собственный, создав его с помощью ExecutorService.
Методы Schedulers.start() и Schedulers.shutdown()
Данные методы используются соответсвенно для инициализации планировщика и его очищения. Метод Schedulers.shutdown() перестанет выполнять любые задачи и для того, чтобы код снова выполнялся на отдельном потоке, необходимо реинициализировать планировщик с помощью Schedulers.start()
Теперь, рассмотрев разные виды планировщиков, вы можете эффективнее управлять потоками с помощью RxJava. Кстати у нас есть бесплатный базовый курс по RxJava3.0. Ну а если хотите овладеть RxJava профессионально на практике и применять данную библиотеку в своих Android-приложениях, то приглашаю пройти онлайн – интенсив по Android-разработке с наставником, где вы прокачаетесь до middle за 2 месяца
- 📌 Онлайн — интенсив по Android-разработке с code review
- 📌 Telegram — канал @android_school_ru где публикуются полезные материалы для Android
- 📌 Instagram– карьерные лайфхаки, прямые эфиры с разработчиками, анонсы интенсивов
Источник
Многопоточное программирование в Android с использованием RxJava 2
Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.
Оригинал статьи написан 29 ноября 2017. Перевод вольный.
Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.
Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.
В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях.
Почему реактивное программирование?
Каждая статья о реактивном программировании начинается с такого обязательного блока и мы не нарушим эту традицию. Существует несколько преимуществ использования реактивного подхода к построению Android приложений, обратим внимание на те, которые вам действительно нужны.
Никаких больше обратных вызовов
Если вы давно разрабатываете под Android, то, должно быть, заметили, как быстро вещи становятся чересчур сложными и неподконтрольными с использованием вложенных обратных вызовов.
Это происходит, когда вы выполняете несколько асинхронных операций последовательно и хотите, чтобы дальнейшие действия зависели от результата предыдущих операций. Почти сразу код становится слишком перегруженным и сложным для поддержки.
Простой контроль ошибок
В императивном мире, в ситуации когда выполняется множество сложных асинхронных операций, ошибки могут возникать в большом количестве мест. И в каждом месте вы должны обрабатывать эти ошибки, в результате появляется много повторяющегося шаблонного кода, методы становятся громоздкими.
Очень простое использование многопоточности
Все мы знаем (и тайно признаем) насколько иногда сложной может быть работа с многопоточностью в Java. Например, выполнение части кода в фоновом потоке и возврат результата обратно в главный поток. Это только звучит просто, но на практике появляется много подводных камней, которые нужно обходить.
RxJava делает безумно легким выполнение нескольких сложных операций в любом потоке на ваш выбор, заботясь о корректной синхронизации и позволяя без проблем переключаться между потоками.
Преимущества RxJava бесконечны. Мы можем говорить об этом часами и адски утомить вас, но вместо этого давайте копнем глубже и начнем изучать реальную работу с многопоточностью в RxJava.
RxJava НЕ многопоточна по умолчанию
Да, вы прочли всё верно. RxJava по умолчанию не многопоточна в любом случае. Определение, данное для RxJava на официальном сайте, выглядит примерно следующим образом:
«Библиотека для составления асинхронных и основанных на событиях программ с использованием последовательностей (observable sequences) для виртуальной Java машины».
Увидев слово «асинхронных», многие люди ошибочно полагают, что RxJava многопоточна по умолчанию. Да, RxJava поддерживает многопоточность, предлагает множество мощных возможностей для легкой работы с асинхронными операциями, но это не значит что поведение RxJava по умолчанию многопоточно.
Если вы уже немного работали с RxJava, то её знаете базовые конструкции:
- Наблюдаемый источник (source Observable), далее
- несколько операторов (Operators), затем
- целевой подписчик (Subscriber)
Если вы запустите данный пример кода, то ясно увидите, что все действия выполняются в основном потоке приложения (проследите за именами потоков в логе в консоли). Этот пример показывает, что по умолчанию поведение RxJava блокирующее. Всё выполняется в том же потоке, в котором вызван код.
Бонус: Интересно, что же делает doOnNext() ? Это не что иное, как side-effect оператор. Он помогает внедряться в цепочку объектов observable и выполнять грязные (impure) операции. Например, внедрять дополнительный код в цепочке вызовов для отладки. Прочитать больше можно здесь.
Простой пример
Для того, чтобы начать работать с многопоточностью с применением RxJava необходимо познакомиться с базовыми классами и методами, такими как Schedulers, observeOn/subscribeOn.
Давайте рассмотрим один из самых простых примеров. Допустим, мы хотим получить список объектов Book сетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.
Здесь мы видим метод getBooks() , который осуществляет сетевой вызов и собирает список книг для нас. Сетевой вызов занимает время (несколько миллисекунд или секунд), поэтому мы используем subscribeOn() и указываем планировщик Schedulers.io() для выполнения операции в потоке ввода-вывода.
Также мы используем оператор observeOn() вместе с планировщиком AndroidSchedulers.mainThread() для того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.
Не волнуйтесь, скоро мы перейдем к более продвинутым вещам. Этот пример был предназначен только для того, чтобы вспомнить базовые понятия, прежде чем погрузиться глубже.
Подружимся с планировщиками (Schedulers)
RxJava предоставляет мощный набор планировщиков. Вы не можете получить прямой доступ к потокам или управлять ими. Если вам нужно работать с потоками, то необходимо воспользоваться встроенными планировщиками.
Можете представлять планировщики как потоки или пулы потоков (коллекции потоков) для выполнения разного рода задач.
Говоря проще, если вам нужно выполнить задачу в отдельном потоке — необходимо использовать верный планировщик, который возьмёт поток из своего пула доступных потоков и выполнит в нём задачу.
В RxJava доступны несколько типов планировщиков. Самая сложная часть — выбрать верный планировщик для вашей задачи. Задача никогда не будет выполняться оптимально, если вы не выберете верный планировщик. Давайте разберем каждый планировщик.
Schedulers.io()
Этот планировщик основывается на неограниченном пуле потоков и используется для интенсивной работы с вводом-выводом без использования ЦП, например, доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и так далее. Количество потоков в этом планировщике неограничено и может расти по мере необходимости.
Schedulers.computation()
Этот планировщик используется для выполнения работы, высоко нагружающей ЦП, такой как обработка больших объемов данных, изображений и так далее. Планировщик основывается на ограниченном пуле потоков с размером в количество доступных процессоров.
Так как этот планировщик подходит только для интенсивной работы с ЦП — количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.
Schedulers.newThread()
Этот планировщик создает совершенно новый поток при каждом вызове. В данном случае использование пула потоков не принесет никакой выгоды. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести в замедлению работы системы и переполнению памяти. Новый поток будет создаваться для обработки каждого элемента, полученного из observable-источника.
В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.
Schedulers.single()
Этот планировщик основывается на единственном потоке, который используется для последовательного выполнения задач. Он может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.
Schedulers.from(Executor executor)
Этот планировщик будет основываться на вашем собственном Executor . Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков.
Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков ( Scheduler.from(Executors.newFixedThreadPool(n)) ) и использовать его во всех местах, связанных с сетевыми вызовами.
AndroidSchedulers.mainThread()
Это специальный планировщик, который недоступен в библиотеке RxJava. Необходимо использовать расширяющую библиотеку RxAndroid для доступа к этому планировщику. Этот планировщик полезен в Android приложениях для выполнения действий в потоке пользовательского интерфейса.
По умолчанию этот планировщик ставит задания в очередь в Looper , связанный с основным потоком, но есть возможность переопределения: AndroidSchedulers.from(Looper looper) .
Заметка: Будьте осторожны в использовании планировщиков, основанных на неограниченных пулах потоков, таких как Schedulers.io() . Всегда есть риск бесконечного роста количества потоков.
Понимание subscribeOn() и observeOn()
Теперь, когда у вас есть представление о типах планировщиков, разберем subscribeOn() и observeOn() в деталях.
Вы должны глубоко разбираться в том, как эти два оператора работают по отдельности и вместе, чтобы профессионально работать с многопоточностью в RxJava.
subscribeOn()
Простыми словами, этот оператор говорит в какой поток наблюдаемый источник (source observable) будет передавать элементы. Вы должны уяснить важность слова «источник». Когда у вас цепь наблюдаемых элементов (observables), источник (source observable) — это всегда корневой элемент или верхняя часть цепи, откуда происходит создание событий.
Как вы уже видели, если не использовать subscribeOn() , то все события происходят в том потоке, в котором произошел вызов кода (в нашем случае — main поток).
Давайте перенаправим события в вычислительный поток с помощью subscribeOn() и планировщика Schedulers.computation() . Когда вы запустите нижеследующий пример кода, то увидите, что события происходят в одном из вычислительных потоков, доступных в пуле — RxComputThreadPool-1 .
В целях сокращения кода мы не будем полностью переопределять все методы DisposableSubscriber , так как нам не нужно переопределять onError() и onComplete() . Воспользуемся doOnNext() и лямбдами.
Не важно в каком месте в цепочке вызовов вы используете subscribeOn() . Он работает только с наблюдаемым источником (source observable), и контролирует в какой поток наблюдаемый источник передает события.
В нижеследующем примере после observable-источника создаются другие объекты observable (методами map() и filter() ), а оператор subscribeOn() помещен в конце цепочки вызовов. Но как только вы запустите этот код, то заметите, что все события будут возникать в потоке, указанном в subscribeOn() . Это станет более понятным при добавлении observeOn() в цепь вызовов. И даже если мы разместим subscribeOn() ниже observeOn() , то логика работы не изменится. subscribeOn() работает только с наблюдаемым источником (source observable).
Также важно понять, что нельзя использовать subscribeOn() несколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?
Если вы ответили Schedulers.io() , то вы правы! Даже если делать вызов многократно — сработает только первый subscribeOn() , вызванный после observable-источника.
Под капотом
Стоит потратить ещё немного времени на более подробное изучение рассмотренного примера. Почему срабатывает только планировщик Schedulers.io() ? Обычно все думают, что сработает Schedulers.newThread() , так как этот вызов находится в конце цепочки.
Необходимо понять, что в RxJava подписка создаётся после обратного вызова всех экземпляров Observable . Код ниже поможет нам разобраться в этом. Это ранее рассмотренный пример, но расписанный подробнее.
Для того, чтобы понять как всё работает — начнем разбирать всё с последней строки примера. В ней целевой подписчик (target subscriber), вызывает метод subscribe() у observable объекта o3 , который затем делает неявный вызов subscribe() у своего родительского observable объекта o2 . Реализация наблюдателя (observer), предоставляемая объектом o3 , умножает переданные числа на 10.
Процесс повторяется и o2 неявно вызывает subscribe() у объекта o1 , передавая реализацию наблюдателя, которая позволяет обрабатывать только четные числа. Теперь мы достигли корневого элемента ( o1 ), у которого нет родителя для последующего вызова subscribe() . На этом этапе завершается цепочка наблюдаемых (observable) элементов, после чего observable-источник начинает передавать (emit) элементы.
Теперь для вас должна быть понятна концепция работы подписок в RxJava. К настоящему времени у вас должно появиться понимание того, как формируются цепочки наблюдаемых (observable) объектов и как события распространяются, начиная с observable-источника.
observeOn()
Как мы уже видели, subscribeOn() указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика (Subscriber). Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.
Но это может быть не то поведение, которого вы ожидаете. Предположим, вы хотите получить некие данные из сети и отобразить их в пользовательском интерфейсе.
Нужно выполнить две вещи:
- Сделать сетевой вызов в неблокирующем потоке ввода-вывода
- Получить результат в основном потоке приложения
У вас будет Observable , который осуществляет сетевой вызов в потоке ввода-вывода и передает результат подписчику. Если вы используете только subscribeOn(Schedulers.io()) , то целевой подписчик будет обрабатывать результат в том же потоке ввода-вывода. И нам не повезло, так как работать с пользовательским интерфейсом в Android можно только в основном потоке.
Теперь нам крайне необходимо переключить потоки и мы используем для этого observeOn() . Когда observeOn() встречается в цепочке вызовов, то передаваемые observable-источником элементы незамедлительно перебрасываются в поток, указанный в observeOn() .
В этом придуманном примере мы наблюдаем получение целых чисел из сети и их дальнейшую передачу из observable-источника. В реальных примерах это может быть любая другая асинхронная операция, например, чтение большого файла, выборка данных из базы данных и т.д. Вы можете запустить данный пример и посмотреть на результаты, просто следите за логами в консоли.
Теперь рассмотрим более сложный пример, в котором observeOn() будет вызываться несколько раз для переключения потоков в процессе обработки данных.
В примере выше observable-источник передаёт элементы в цепочку обработчиков в потоке ввода вывода, так как мы использовали subscribeOn() вместе с Schedulers.io() . Далее мы хотим преобразовать каждый элемент, используя оператор map() , но сделать это нужно в вычислительном потоке. Для этого используем observeOn() вместе с Schedulers.computation() перед вызовом map() для переключения потока и передачи элементов в целевой вычислительный поток.
Следующим шагом отфильтруем некоторые элементы и по какой-то причине мы хотим выполнить эту операцию в новом потоке для каждого из элементов. Используем снова observeOn() , но уже в паре с Schedulers.newThread() перед вызовом оператора filter() для передачи каждого элемента в новый поток.
В итоге мы хотим, чтобы подписчик получил результат обработки в потоке пользовательского интерфейса. Для этого используем observeOn() вместе с планировщиком AndroidSchedulers.mainThread() .
Но что произойдет, если использовать observeOn() несколько раз последовательно? В примере ниже в каком потоке подписчик получит результат?
Если запустите пример, то увидите, что подписчик получит элементы в вычислительном потоке RxComputationThreadPool-1 . Это значит, что сработал последний вызванный observeOn() . Интересно почему?
Под капотом
Возможно вы уже догадались. Как мы знаем, подписка (subscription) вызывается после обратного обхода всех Obsevable , но с передачей событий (emissions) всё происходит наоборот, то есть в обычном порядке, как записан код. Вызов происходит от observable-источника и далее вниз по цепочке вызова вплоть до подписчика.
Оператор observeOn() всегда работает в прямом порядке, поэтому последовательно происходит переключение потоков и последним происходит переключение на вычислительный поток ( observeOn(Schedulers.computation()) ). Итак, когда нужно переключить поток для обработки данных в новом потоке, то просто сначала вызовите observeOn() , а далее обрабатывайте элементы. Синхронизация, исключение состояния гонки, всё это и многие другие сложности многопоточности RxJava обрабатывает за вас.
Резюме
Сейчас у вас должно быть достаточно хорошее представление о том, как правильно использовать RxJava для написания многопоточных приложений, обеспечивающих быструю и плавную работу пользовательского интерфейса.
Если понимание не пришло сразу, ничего страшного. Прочитайте статью ещё раз, поэкспериментируйте с примерами кода. Здесь достаточно много нюансов для понимания, не торопитесь.
Источник