Android subscribe on main thread

Understanding RxJava subscribeOn and observeOn

One of the biggest strengths of RxJava is its ability to easily schedule work and process results on various threads. This article aims to give you a solid foundation of working with threads in RxJava and RxAndroid to optimize system performance while avoiding bugs (threading-related bugs are notoriously hard to track down).

RxJava Schedulers

Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

Let’s summarize available Scheduler types and their common uses:

  1. Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.
  2. Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.
  3. Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.
  4. Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)) . This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.
  5. Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.
  6. Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.
  7. Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

WARNING: Be careful writing multi-threaded code using unbounded thread Schedulers such as Schedulers.io() and Schedulers.newThread() . Depending on your data stream and the transformations you apply to it, it’s easier than you think to flood your system with threads.

Default threading in RxJava

If you don’t specify threading in RxJava (if you don’t specify subscribeOn , observeOn or both), the data will be emitted and processed by the current scheduler/thread (usually the main thread). For instance, all operators in the chain below will be processed by the current thread.

Note: some operators, such as interval , operate on a computation thread by default. See below for more details.

We can specify a thread to execute any operator by using subscribeOn and/or observeOn .

  • subscribeOn affects upstream operators (operators above the subscribeOn )
  • observeOn affects downstream operators (operators below the observeOn )
  • If only subscribeOn is specified, all operators will be be executed on that thread
  • If only observeOn is specified, all operators will be executed on the current thread and only operators below the observeOn will be switched to thread specified by the observeOn

For instance, in the following chain:

Data emission just and the map operator will be executed on the io scheduler as directed by the upstream operator subscribeOn .

filter will be executed on the computation scheduler as directed by the downstream operator observeOn .

Read on for more details, ways to debug as well as nuances of the threading operator in RxJava.

Debugging threading

As before, let’s look at a basic RxJava chain where we emit Strings and calculate their lengths.

When executed, this will print:

Now, let’s see what thread this work is being done on by printing out thread info in doOnNext() , a side effect operator that gets executed for each item emitted.

When executed, this will print:

So this stream is being emitted and processed on the main thread which makes sense because the block of code above resides inside the main method of my class.

Doing work on background thread

Often it makes sense to delegate certain work to a background thread. A typical example would be offloading an IO operation from the main thread. RxJava makes it easy.

subscribeOn() operator tells the source Observable which thread to emit and push items on all the way down to Observer (hence, it affects both upstream and downstream operators). It does not matter where you put the subscribeOn() in your Observable chain of operators.

Things to remember about our Observable are:

  1. It is a “cold” Observable which means the emission occurs lazily only when Subscriber is added (call to subscribe() is made). New emission will happen for each subscriber added.
  2. subscribeOn() specifies a Scheduler (thread pool) where the work will be performed after subscription is made in subscribe() .
  3. The results of transformation are received on the same thread as the thread that did the actual work. This can be changed using observeOn() as we’ll see soon.
Читайте также:  Ccleaner android 4pda premium

Let’s run the updated code example inside the main method.

When executed, this will print nothing!

This is because the main method finished executing before the background thread returned results. To get around this, let’s keep the main method alive for an additional 3 seconds with Thread. sleep(3000) — long enough to give our Observable a chance to fire emissions on the background thread.

When executed, this will print:

The results of the background thread work are returned on the same thread, RxNewThreadScheduler-1.

When performing Network/IO/computation tasks, using background scheduler is crucial. Without subscribeOn() , your code will use a caller thread to perform operations, causing Observable to become blocking.

Understanding observeOn()

As we saw above, subscribeOn() instructs the source Observable which thread to emit items on — this thread will push the emissions all the way to our Observer . However, if it encounters an observeOn() anywhere in the chain, it will switch and pass emissions using that Scheduler for the remaining (downstream) operations.

Usually the observing thread in Android is the main (UI) thread, AndroidSchedulers.mainThread() . This requires RxAndroid extension library to RxJava.

Let’s modify our example code to perform background work on Schedulers.newThread() but then switch to AndroidSchedulers.mainThread() .

When executed, we will see that now results are received by the main thread.

Doing work asynchronously

While RxJava is known as a library for composing asynchronous and event-based programs using observable sequences, there are a plenty of useful tasks it can do synchronously. For instance, map(String::length) above handles each item using the same thread RxNewThreadScheduler-1 sequentially preserving the same order.

Simply using subscribeOn() at the start of an Observable chain means the process is still operating on a single thread and emitting items synchronously downstream. However, when you start combining different streams on different threads or use operators such as observeOn() , interval() , delay() , your Observable chain is no longer synchronous.

Now, let’s see how the example above can be modified so that each item emitted is processed by a separate thread simultaneously.

Introducing flatMap()

flatMap() wraps each item being emitted by an Observable letting you apply its own RxJava operators including assigning a new Scheduler using subscribeOn() to handle those operators. Once all items inside flatMap() have been processed, the individual Observable s are then merged back into a single Observable in no particular order.

To make things more realistic, let us pretend that a transformation for each item takes up to 3 seconds to complete. The following 2 things should hold true:

  1. Each item is processed by its own thread
  2. Due to random time it takes to process each item, the order of the items completed is not guaranteed.

This will result in the following output:

Notice that a) each item was processed by a separate thread and b) the order of the elements after the transformation is random. So flatMap() worked exactly as we expected.

What if you need to preserve the order of the resulting items?

Introducing concatMap()

concatMap() is similar to flatMap() but guarantees that the order of the items processed is the same as in the original emission.

This results in the following output:

Note that the items are returned in the same order as in the original stream.

subscribeOn() gotchas

As seen above, subscribeOn() changes the thread on which our Observable is emitted and transformed. In the absence of observeOn() , the results of the stream processing are sent to the thread that did the work (thread specified in subscribeOn() ). For instance, if we have subscribeOn(Schedulers.computation()) and observeOn() is not specified, the results are dispatched to the Computation thread as well.

It does not matter where you put subscribeOn() operator within your chain — it will still denote the thread on which the Observable will be emitted on.

If you specify multiple subscribeOn() RxJava operators in your chain, only the first one will be used and the following ones will be ignored unless the subscribeOn() is used inside flatMap() as seen above.

Here is an example:

This will result in the following output:

Note that Schedulers.computation() thread pool above did the work while Schedulers.newThread() was never used. This is because the computation Scheduler was listed first and all subsequent subscribeOn() operators were simply ignored.

Default Schedulers

Some libraries specify subscribeOn() internally to enforce which thread does the background work. For instance, Observable.delay() from RxJava library will emit on the Computation Scheduler by default. Any subscribeOn() you specify on it will do nothing. However, you can use an overloaded version of the factory method for that operator instead to pass custom Scheduler of your choice.

Pro-tip: RxLint can warn you when you use an operator such as delay() without overriding its default Scheduler.

What this also means is that when you use Scheduler-dependent operators such as delay() , interval() , etc. while using subscribeOn() , you may be spawning (but not using) a thread without realizing it. Always review the Javadoc for those operators to ensure the optimal usage. In particular, pay attention to @SchedulerSupport annotation.

Читайте также:  Nomad reader для андроид

onError()

Finally, when subscribeOn() is used but the onError() is not, if an error occurs, it will be thrown on the subscribed Scheduler thread but the error stacktrace will have no reference to the place where you subscribed. This will make debugging extremely hard. To avoid the issue, use onError() .

observeOn() gotchas

It’s important to remember that unlike subscribeOn() , placement of observeOn() matters. Switching scheduler with observeOn() applies to all downstream operators (operators listed below observeOn() ).

For instance, in the following example due to observeOn() placement map(String::length) and filter(length -> length == 6) will be executed on the main thread. Is this really what was intended?

Be careful where you put the observeOn() operator because it changes the Scheduler performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.

For instance, let’s look at the following RxJava chain which makes an HTTP network call:

There is no reason to have observeOn() operator applied above the map() operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread:

Be wary of multiple observeOn()

You can have multiple observeOn() operators. As operators are executed downstream, each observeOn() below will override the one above.

Here is an example:

The output is as follows:

As a final note, I would recommend that you avoid this kind of complexity if at all possible. Doing so will make it significantly easier to debug and maintain this code in the future. If you are not convinced, check out Dan Lew’s podcast linked in the Resources section.

  1. Reactive Programming on Android with RxJava by Chris Arriola
  2. Articles by Thomas Nield on RxJava and maximizing parallelization part1, part2 and part3
  3. Learning RxJava book by Thomas Nield
  4. Dan Lew’s Fragmented Podcast talk on the importance of keeping it simple with RxJava
  5. Exploring RxJava 2 for Android talk by Jake Wharton.

Thanks to Alex Hart for his input with this article.

Источник

Разбираемся с многопоточностью в RxJava


Когда описывают преимущества RxJava, всегда упоминают об удобстве организации работы многопоточного приложения средствами RxJava. То, как использовать операторы subscribeOn и observeOn, можно прочитать практически в каждой статье, посвященной основам RxJava. Например, здесь хорошо описаны случаи, когда использовать методы subscribeOn и когда observeOn. Однако, на практике часто приходится сталкиваться с проблемами, для которых нужно более глубокое понимание того, что именно делают методы subscribeOn и observeOn. В этой статье я хотел бы рассмотреть ряд вопросов, которые иногда возникают при использовании этих операторов.

Изучать нюансы RxJava можно разными способами: по документации (которая весьма подробна), по исходникам или же на практике. Я выбрал последний способ. Для этого я набросал пару тестов, по работе которых я смог лучше разобраться с асинхронным реактивным программированием.

Сначала для проверки работы смены потоков я использовал следующий код:

Проверим как работает этот код без всяких преобразований:

Результат:
Inside observable: main
Before transform: main
After transform: main
Inside doOnNext: main
In onNext: main
In onComplete: main

Как и ожидалось, никакой смены потоков.

1. ObserveOn и SubscribeOn

SubscribeOn
Как можно понять из документации reactivex.io/documentation/operators/subscribeon.html
с помощью этого оператора можно указать Scheduler, в котором будет выполняться процесс Observable.
Проверяем:

Результат:
Inside observable: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

Начиная с выполнения содержимого Observable и до получения результата, все методы выполнялись в потоке, созданном Schedulers.io().

ObserveOn
В документации по этому методу сказано, что применение этого оператора приводит к тому, что последующие операции над “излученными” данными будут выполняться с помощью Scheduler, переданным в этот метод.

Результат:
Inside observable: main
Before transform: main
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

Как и ожидалось, с момента применения метода observeOn поток, в котором производится обработка данных, будет изменен на тот, который ему выделит указанный Scheduler.

Объединим использование subscribeOn и observeOn:

Результат:
Inside observable: RxComputationThreadPool-3
Before transform: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

Методы, выполняемые до применения оператора observeOn выполнились в Scheduler, указанном в subscribeOn, а после – в scheduler, указанном в observeOn.

Комбинируя эти два метода, можно добиться асинхронной загрузки данных из интернета и отображения их на экране в главном потоке приложения.

Но что будет, если применить эти методы несколько раз?
Для начала вызовем observeOn несколько раз:

Inside observable: main
Before transform: main
Between two observeOn: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

Никаких сюрпризов. После применение observeOn обработка элементов производится с помощью указанного Scheduler.

Теперь вызовем subscribeOn несколько раз.

Результат:
Inside observable: RxComputationThreadPool-1
Before transform: RxComputationThreadPool-1
Between two observeOn: RxComputationThreadPool-1
After transform: RxComputationThreadPool-1
Inside doOnNext: RxComputationThreadPool-1
In onNext: RxComputationThreadPool-1
In onComplete: RxComputationThreadPool-1

Как видим, применение второго subscribeOn не привело ни каким изменениям. Но совсем ли он бесполезен?

Добавим между вызовами subscribeOn оператор:

Читайте также:  Обход гугл аккаунта после сброса настроек андроид

Получим первое сообщение в логе:
Inside lift: RxCachedThreadScheduler-1

RxCachedThreadScheduler-1 — это именно тот поток, который был получен из Schedulers.io(), указанного во втором вызове subscribeOn.

lift() — это оператор, с помощью которого можно трансформировать subscription.
Можно схематично описать процесс выполнения подписки следующим образом:
Пользователь подписывается на observable, передавая subscription.
Этот subscription доставляется до корневого observable, при этом он может быть преобразован с помощью операторов.
Subscription передается в observable, отправляются onNext, onComplete, onError.
Над произведенными элементами выполняются преобразования
Преобразованные элементы попадают в onNext изначального subscriber.
Таким образом, когда subscription доставляется до observable, изменить поток можно с помощью subscribeOn. А когда элементы доставляются из observable в subscription – влияет observeOn.
Для того, что бы это проиллюстрировать рассмотрим код:

Подписчик, созданный в последней строчке, передается в Observable, созданный с помощью Observable.create(). Внутри оператора map вызывается оператор lift, куда передается Operation, который во время подписки декорирует Subscriber. Когда Observable излучает данные, они попадают в декорированный Subscriber. Декорированный Subscriber изменяет данные и отправляет их в оригинальный Subscriber.
Без изменения Scheduler весь процесс будет выполняться в потоке, в котором вызывается метод subscribe. Далее, пока Subscriber декорируется, с помощью subscribeOn можно изменить поток, в котором будет выполняться следующая декорация. В методе call() интерфейса OnSubscribe будет использоваться последний Scheduler, указанный в SubscribeOn. После излучения данных, Scheduler меняется уже с помощью onserveOn.

2. Выполняем задачи параллельно.

Рассмотрим следующий кейс:
Необходимо загрузить с сервера различную информацию, после этого скомпоновать ее и отобразить на экране. При этом, чтобы ускорить процесс, загружать данные стоит параллельно (если есть такая возможность). Если бы у нас не было RxJava, то эта задача требовала бы значительных усилий. Но с реактивным программированием эта задача тривиальна.

Мы будем выполнять три задачи, каждая из которых ждет 1 секунду, а потом отправляет сообщение в subscription. Далее с помощью оператора combineLatest все сообщения будут объединены и переданы в подписку.

Для проверки будем использовать следующий код:

Для начала запустим тест без всяких преобразований:

Результат:
Inside Observable1: main
Inside Observable2: main
Inside Observable3: main
Inside combining result: main
Before transform: main
After tranform: main
In onNext: main
In onComplete: main

Как видим, все выполняется в одном потоке. Наши три задачи выполняются последовательно.

Добавим subscribeOn и observeOn для observable, полученного с помощью функции zip.

Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-1
Inside Observable3: RxCachedThreadScheduler-1
Inside combining result: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After tranform: RxNewThreadScheduler-1
In onNext: RxNewThreadScheduler-1
In onComplete: RxNewThreadScheduler-1

Все так, как и описывалось в предыдущей части статьи про subscribeOn и observeOn.

Теперь каждую из задач будем выполнять в своем потоке. Для этого достаточно указать Scheduler.io(), т.к. внутри него содержится пулл потоков, оптимальный для загрузки данных.

Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-2
Inside Observable3: RxCachedThreadScheduler-3
Inside combining result: RxCachedThreadScheduler-3
Before transform: RxCachedThreadScheduler-3
After tranform: RxComputationThreadPool-3
In onNext: RxComputationThreadPool-3
In onComplete: RxComputationThreadPool-3

Мы добились того, чего и хотели — три наши задачи выполнились параллельно.

3. Операторы с Schedulers.

В предыдущей главе для эмулирования долгих задач отлично подошел бы оператор delay(), но проблема в том, что этот оператор не так прост, как может показаться на первый взгляд.
Существует ряд операторов, которые требуют указания Scheduler для свой работы. При этом есть их перегруженные версии, которые в качестве Scheduler используют computation(). delay() является примером такого оператора:

Несмотря на то, что мы не указывали никакой Scheduler, результат будет следующим:
LastSeenThread: RxComputationThreadPool-1

Для того, что бы избежать использования computation scheduler, достаточно третьим параметром передать требуемый scheduler:
.delay(1, TimeUnit.SECONDS, Schedulers.immediate())

Примечание: Schedulers.immediate() — выполняет задачу в том же потоке, в котором выполнялась предыдущая задача.

Результат:
LastSeenThread: main

Кроме delay() существуют и другие операторы, которые могут сами менять Scheduler: interval(), timer(), некоторые перегрузки buffer(), debounce(), skip(), take(), timeout() и некоторые другие.

4. Subjects.

При использовании Subjects стоит учесть то, что по умолчанию цепочка изменений данных, отправленных в onNext subject, будет выполняться в том же потоке, в котором был вызван метод onNext(). До тех пор, пока не встретится в цепочке преобразований оператор observeOn.
А вот применить subscribeOn так просто не получится.

Рассмотрим следующий код:

Тут указаны и observeOn и subscribeOn, но результат будет следующим:
doOnNext: RxCachedThreadScheduler-1
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1

Т.е. когда мы подписываемся на subject, он сразу возвращает значение и оно обрабатывается потоке из Shedulers.io(), а вот когда приходит следующее сообщение в subject, то используется поток, в котором был вызван onNext().

Поэтому, если вы после получения объекта из subject запускаете какую-то долгую операцию, то необходимо явно проставить observeOn между ними.

5. Backpressure

В этой статье невозможно не упомянуть о таком понятии как backpressure. MissingBackpressureException — ошибка, которая довольно много нервов мне подпортила. Я не стану тут пересказывать то, что можно прочитать в официальной wiki RxJava: github.com/ReactiveX/RxJava/wiki/Backpressure. Но если вы активно используете RxJava, то вам обязательно надо прочитать о backpressure.
Когда у вас в приложении имеется некоторый производитель данных в одном потоке и какой-то потребитель в другом, то стоит учитывать ситуацию, когда потребитель будет не успевать обрабатывать данные. В такой ситуации вам помогут операторы, описанные по приведенной ссылке.

Заключение.

“it makes the most sense for Subscribers to always assume that values are delivered asynchronously, even though on some occasions they may be delivered synchronously.”

“Для подписчика имеет смысл считать, что данные доставляются асинхронно, даже в тех случаях, когда они могут доставляться синхронно”.

Источник

Оцените статью