- Разбираемся с многопоточностью в RxJava
- 1. ObserveOn и SubscribeOn
- 2. Выполняем задачи параллельно.
- 3. Операторы с Schedulers.
- 4. Subjects.
- 5. Backpressure
- Заключение.
- RxJava: subscribeOn vs observeOn
- observeOn:
- subscribeOn:
- Thomas Nield
- YouTube Videos
- Saturday, February 13, 2016
- RxJava- Understanding observeOn() and subscribeOn()
- Concurrency and Multithreading in a Nutshell
- subscribeOn()
- Choosing a Scheduler
- observeOn()
Разбираемся с многопоточностью в RxJava
Когда описывают преимущества RxJava, всегда упоминают об удобстве организации работы многопоточного приложения средствами RxJava. То, как использовать операторы subscribeOn и observeOn, можно прочитать практически в каждой статье, посвященной основам RxJava. Например, здесь хорошо описаны случаи, когда использовать методы subscribeOn и когда observeOn. Однако, на практике часто приходится сталкиваться с проблемами, для которых нужно более глубокое понимание того, что именно делают методы subscribeOn и observeOn. В этой статье я хотел бы рассмотреть ряд вопросов, которые иногда возникают при использовании этих операторов.
Изучать нюансы RxJava можно разными способами: по документации (которая весьма подробна), по исходникам или же на практике. Я выбрал последний способ. Для этого я набросал пару тестов, по работе которых я смог лучше разобраться с асинхронным реактивным программированием.
Сначала для проверки работы смены потоков я использовал следующий код:
Проверим как работает этот код без всяких преобразований:
Результат:
Inside observable: main
Before transform: main
After transform: main
Inside doOnNext: main
In onNext: main
In onComplete: main
Как и ожидалось, никакой смены потоков.
1. ObserveOn и SubscribeOn
SubscribeOn
Как можно понять из документации reactivex.io/documentation/operators/subscribeon.html
с помощью этого оператора можно указать Scheduler, в котором будет выполняться процесс Observable.
Проверяем:
Результат:
Inside observable: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Начиная с выполнения содержимого Observable и до получения результата, все методы выполнялись в потоке, созданном Schedulers.io().
ObserveOn
В документации по этому методу сказано, что применение этого оператора приводит к тому, что последующие операции над “излученными” данными будут выполняться с помощью Scheduler, переданным в этот метод.
Результат:
Inside observable: main
Before transform: main
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Как и ожидалось, с момента применения метода observeOn поток, в котором производится обработка данных, будет изменен на тот, который ему выделит указанный Scheduler.
Объединим использование subscribeOn и observeOn:
Результат:
Inside observable: RxComputationThreadPool-3
Before transform: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Методы, выполняемые до применения оператора observeOn выполнились в Scheduler, указанном в subscribeOn, а после – в scheduler, указанном в observeOn.
Комбинируя эти два метода, можно добиться асинхронной загрузки данных из интернета и отображения их на экране в главном потоке приложения.
Но что будет, если применить эти методы несколько раз?
Для начала вызовем observeOn несколько раз:
Inside observable: main
Before transform: main
Between two observeOn: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Никаких сюрпризов. После применение observeOn обработка элементов производится с помощью указанного Scheduler.
Теперь вызовем subscribeOn несколько раз.
Результат:
Inside observable: RxComputationThreadPool-1
Before transform: RxComputationThreadPool-1
Between two observeOn: RxComputationThreadPool-1
After transform: RxComputationThreadPool-1
Inside doOnNext: RxComputationThreadPool-1
In onNext: RxComputationThreadPool-1
In onComplete: RxComputationThreadPool-1
Как видим, применение второго subscribeOn не привело ни каким изменениям. Но совсем ли он бесполезен?
Добавим между вызовами subscribeOn оператор:
Получим первое сообщение в логе:
Inside lift: RxCachedThreadScheduler-1
RxCachedThreadScheduler-1 — это именно тот поток, который был получен из Schedulers.io(), указанного во втором вызове subscribeOn.
lift() — это оператор, с помощью которого можно трансформировать subscription.
Можно схематично описать процесс выполнения подписки следующим образом:
Пользователь подписывается на observable, передавая subscription.
Этот subscription доставляется до корневого observable, при этом он может быть преобразован с помощью операторов.
Subscription передается в observable, отправляются onNext, onComplete, onError.
Над произведенными элементами выполняются преобразования
Преобразованные элементы попадают в onNext изначального subscriber.
Таким образом, когда subscription доставляется до observable, изменить поток можно с помощью subscribeOn. А когда элементы доставляются из observable в subscription – влияет observeOn.
Для того, что бы это проиллюстрировать рассмотрим код:
Подписчик, созданный в последней строчке, передается в Observable, созданный с помощью Observable.create(). Внутри оператора map вызывается оператор lift, куда передается Operation, который во время подписки декорирует Subscriber. Когда Observable излучает данные, они попадают в декорированный Subscriber. Декорированный Subscriber изменяет данные и отправляет их в оригинальный Subscriber.
Без изменения Scheduler весь процесс будет выполняться в потоке, в котором вызывается метод subscribe. Далее, пока Subscriber декорируется, с помощью subscribeOn можно изменить поток, в котором будет выполняться следующая декорация. В методе call() интерфейса OnSubscribe будет использоваться последний Scheduler, указанный в SubscribeOn. После излучения данных, Scheduler меняется уже с помощью onserveOn.
2. Выполняем задачи параллельно.
Рассмотрим следующий кейс:
Необходимо загрузить с сервера различную информацию, после этого скомпоновать ее и отобразить на экране. При этом, чтобы ускорить процесс, загружать данные стоит параллельно (если есть такая возможность). Если бы у нас не было RxJava, то эта задача требовала бы значительных усилий. Но с реактивным программированием эта задача тривиальна.
Мы будем выполнять три задачи, каждая из которых ждет 1 секунду, а потом отправляет сообщение в subscription. Далее с помощью оператора combineLatest все сообщения будут объединены и переданы в подписку.
Для проверки будем использовать следующий код:
Для начала запустим тест без всяких преобразований:
Результат:
Inside Observable1: main
Inside Observable2: main
Inside Observable3: main
Inside combining result: main
Before transform: main
After tranform: main
In onNext: main
In onComplete: main
Как видим, все выполняется в одном потоке. Наши три задачи выполняются последовательно.
Добавим subscribeOn и observeOn для observable, полученного с помощью функции zip.
Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-1
Inside Observable3: RxCachedThreadScheduler-1
Inside combining result: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After tranform: RxNewThreadScheduler-1
In onNext: RxNewThreadScheduler-1
In onComplete: RxNewThreadScheduler-1
Все так, как и описывалось в предыдущей части статьи про subscribeOn и observeOn.
Теперь каждую из задач будем выполнять в своем потоке. Для этого достаточно указать Scheduler.io(), т.к. внутри него содержится пулл потоков, оптимальный для загрузки данных.
Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-2
Inside Observable3: RxCachedThreadScheduler-3
Inside combining result: RxCachedThreadScheduler-3
Before transform: RxCachedThreadScheduler-3
After tranform: RxComputationThreadPool-3
In onNext: RxComputationThreadPool-3
In onComplete: RxComputationThreadPool-3
Мы добились того, чего и хотели — три наши задачи выполнились параллельно.
3. Операторы с Schedulers.
В предыдущей главе для эмулирования долгих задач отлично подошел бы оператор delay(), но проблема в том, что этот оператор не так прост, как может показаться на первый взгляд.
Существует ряд операторов, которые требуют указания Scheduler для свой работы. При этом есть их перегруженные версии, которые в качестве Scheduler используют computation(). delay() является примером такого оператора:
Несмотря на то, что мы не указывали никакой Scheduler, результат будет следующим:
LastSeenThread: RxComputationThreadPool-1
Для того, что бы избежать использования computation scheduler, достаточно третьим параметром передать требуемый scheduler:
.delay(1, TimeUnit.SECONDS, Schedulers.immediate())
Примечание: Schedulers.immediate() — выполняет задачу в том же потоке, в котором выполнялась предыдущая задача.
Результат:
LastSeenThread: main
Кроме delay() существуют и другие операторы, которые могут сами менять Scheduler: interval(), timer(), некоторые перегрузки buffer(), debounce(), skip(), take(), timeout() и некоторые другие.
4. Subjects.
При использовании Subjects стоит учесть то, что по умолчанию цепочка изменений данных, отправленных в onNext subject, будет выполняться в том же потоке, в котором был вызван метод onNext(). До тех пор, пока не встретится в цепочке преобразований оператор observeOn.
А вот применить subscribeOn так просто не получится.
Рассмотрим следующий код:
Тут указаны и observeOn и subscribeOn, но результат будет следующим:
doOnNext: RxCachedThreadScheduler-1
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
Т.е. когда мы подписываемся на subject, он сразу возвращает значение и оно обрабатывается потоке из Shedulers.io(), а вот когда приходит следующее сообщение в subject, то используется поток, в котором был вызван onNext().
Поэтому, если вы после получения объекта из subject запускаете какую-то долгую операцию, то необходимо явно проставить observeOn между ними.
5. Backpressure
В этой статье невозможно не упомянуть о таком понятии как backpressure. MissingBackpressureException — ошибка, которая довольно много нервов мне подпортила. Я не стану тут пересказывать то, что можно прочитать в официальной wiki RxJava: github.com/ReactiveX/RxJava/wiki/Backpressure. Но если вы активно используете RxJava, то вам обязательно надо прочитать о backpressure.
Когда у вас в приложении имеется некоторый производитель данных в одном потоке и какой-то потребитель в другом, то стоит учитывать ситуацию, когда потребитель будет не успевать обрабатывать данные. В такой ситуации вам помогут операторы, описанные по приведенной ссылке.
Заключение.
“it makes the most sense for Subscribers to always assume that values are delivered asynchronously, even though on some occasions they may be delivered synchronously.”
“Для подписчика имеет смысл считать, что данные доставляются асинхронно, даже в тех случаях, когда они могут доставляться синхронно”.
Источник
RxJava: subscribeOn vs observeOn
One of the strongest aspects of RxJava is the simple way to schedule work on a desired thread using either subscribeOn or observeOn . While they seem simple enough at a glance, understanding how they work is crucial to achieving your desired threading assignment.
observeOn:
This method simply changes the thread of all operators further downstream (in the calls that come after). Let’s assume code is run from a UI thread:
One of the most frequent misconceptions is that observeOn also acts upstream, but really it acts only downstream — things that happen after the observeOn call — unlike subscribeOn .
subscribeOn:
This only influences the thread that is used when the Observable is subscribed to and it will stay on it downstream.
Position does not matter
subscribeOn can be put in any place in the stream because it affects only the time of subscription. For example, the code from above is equal to this one:
Methods that obey the contract with subscribeOn
The most basic example is Observable.create . All the work specified inside create body will be run on the thread specified in subscribeOn .
Another example is Observable.just , Observable.from or Observable.range . It is important to note that all of those methods accept values, so do not use blocking methods to create those values, as subscribeOn won’t affect it! If you want to use a blocking function, use Observable.defer as it accepts functions that will be evaluated lazily:
One important fact is that subscribeOn does not work with Subject s. (We will return to this in a future post).
If there are multiple instances of subscribeOn in the stream, only the first one has a practical* effect:
Subscribe and subscribeOn
People think that subscribeOn has something to do with Observable.subscribe , but really it does not have anything special to do with it. Remember, it only affects the subscription phase!
Knowing how subscribeOn and observeOn work, makes the Rx code much more easy to reason with. This understanding will allow you to use it correctly which should give you predictable results in your threading allocations.
* For some operators, additional subscribeOn has a minor side effect. If you are just a beginner you do not worry about it! But if you are interested in every quirky Rx aspect (like I am), check out this blog post by Dávid Karnok .
Источник
Thomas Nield
Random thoughts about analytics, business technology, and other musings.
YouTube Videos
Saturday, February 13, 2016
RxJava- Understanding observeOn() and subscribeOn()
RxJava makes multithreading easier, but the simplified abstraction more or less forces a veteran developer to re-learn multithreading the «RxJava way». Beginners still have to learn multithreading which is not a trivial topic, but RxJava makes it much more accessible.
A few people have asked if I could cover observeOn() and subscribeOn() in a similar manner that I have covered parallelization. Let’s take a hand-on approach to understand observeOn() and subscribeOn() . But first here is a little theory for newcomers to multithreading.
Concurrency and Multithreading in a Nutshell
If you have sizable experience with Java concurrency, please skip this section. Otherwise read on!
If you have never done multithreading/concurrency before, the idea is essentially multitasking. Think of a thread as a cursor executing one line of code at a time, which you can visibly see when you are using breakpoints in debug mode with Eclipse or IDEA. As you step through your code, each statement is executed top-to-bottom. In effect, you slowed down a thread and are now in control of it. You only have one thread traversing your code and executing each statement.
But when you multithread, you can have two or more threads (cursors) traversing your code and executing statements. For instance, you can have three threads doing three different tasks. Two threads could be importing two different sets of data simultaneously, while the third thread is asking the user for login credentials. These three tasks are being executing at the same time, and this is much better than having the user wait for each data set to be loaded before being prompted with a login.
However, when these three threads enter the same object and manipulate its properties, there is a high risk of the cursors overlapping. They can start to race each other and compete chaotically to evaluate and change the object’s properties. That is why immutability should be your default policy with properties, and when properties have to be mutable you use synchronization. While you should always strive for immutability, RxJava greatly reduces the likelihood of race conditions and other multithreading problems. Problems will only likely happen when you create side effects manipulating objects outside the Observable chain.
Another common use of multithreading is parallelization. Say you have 10,000 objects and you need to perform an expensive calculation on each one. Instead of iterating them and executing the process one at a time, you can process 5 at at time by passing them to 5 worker threads. This could make the process up to 5 times faster.
Think of this as a checkout line where having 5 cashiers is better than 1, because you can process a high volume of customers faster. But of course, like threads, cashiers are expensive. If you have 30 customers to process, it is probably not practical to have 30 cashiers due to operational constraints. It is better to have 5 cashiers and «re-use» them after they process each customer, and they can take another customer waiting in the queue. This is effectively what a thread pool does. It internally maintains a set of threads and will queue tasks to delegate to them.
Most computers nowadays have multiple «cores», or processors built into the CPU. If you have a quad-core, you can optimally support 5 computational threads (4 cores + 1 extra thread for idle time). If you have 8 cores, you can optimally support 9 threads, and so on. If you exceed this simple rough formula (e.g. running 6 threads or more on a 4-core machine) you risk compromising performance. But not every task is computational. Importing and exporting data (called IO tasks) is much less taxing on the CPU. You could theoretically have 10 threads on a 4-core machine without issue if they all are simply importing/exporting data.
While RxJava takes a lot of the pain out of concurrency and multithreading, I highly recommend knowing how to use concurrency without RxJava, just so you are aware of the «gotchas» multithreading can manifest. Benjamin Winterberg created an awesome tutorial on Java 8 concurrency which I recommend reading. If you want some deep knowledge on concurrency, check out Java: Concurrency in Practice by Brian Goetz.
subscribeOn()
Before we bring concurrency into the discussion, think long and hard how an Observable chain of operators actually works. You have to have a source Observable where the emissions originate from. Only one emission at a time can be pushed up the entire chain of operators to the Subscriber . By default, the thread that declares the subscription (the subscribe() method) is the thread that pushes these emissions from the source, one a a time, all the way to the Subscriber .
For example, take this simple Observable operation that emits three String objects and maps their lengths.
The subscribe() operation on the second line will receive the emissions and print them. By default, the thread that declares the subscribe() is the one that pushes items from the source all the way up the chain to the Subscriber . If you run this program you will see the following output, indicating this Observable emitted items on the main thread.
This means the main thread (the thread which started the program) executed the emissions of this Observable , and it pushed each emission through the map() operator to the Subscriber . Since the main thread becomes occupied with pushing the emissions, the program will not exit until the Observable is done pushing emissions and calls onCompleted() .
Let’s say we wanted to subscribe to this Observable but we do not want to do it on the current main thread. Pretend calculating the lengths takes awhile. Perhaps we would like to kick off the calculations but not hold up the main thread. That main thread has places to go, things to do, and needs to kick off other tasks. We can do that with the subscribeOn() and specify a Scheduler . This will emit the items from the source on a different thread.
If our task is computational, we should use Schedulers.computation() . This will allocate one of a few fixed number of threads to this Observable operation, and the source will emit items on that thread.
But you may run into a problem and not get any output. Why? With our simple program, the main thread passed off the execution of our Observable chain to a computation thread. The main thread reached the end of the main() method and exited the program, before the computation thread got a chance to emit any items!
You will not likely encounter this with real programs that are kept alive for a session, but for our example we need to keep our main thread alive long enough to see the subscription work. Just make it sleep for three seconds and that should give plenty of time to subscribe and execute the emissions.
Your output should now look like this.
Note that the emissions happened on a computation thread, or more specifically a thread named RxComputationThreadPool-1 . This thread emitted all these items. A common misconception is that multiple threads will automatically parallelize your emissions, but this is not true as it would break the Observable contract. You can only direct emissions of an Observable from one single thread to another single thread. Parallelization is only possible when you create separate Observables as shown here.
It does not matter where in your Observable chain to put the subscribeOn() . No matter where you put it, it will tell the source Observable which thread to emit items on. If you specify multiple subscribeOn() operators, the one closest to the source (the left-most), will be the one used. As a matter of fact, a few source Observable factories, like Observable.interval() , will already specify a subscribeOn() internally. Observable.interval() will already emit on the computation scheduler, and any subscribeOn() you specify on it will do nothing.
In summary, subscribeOn() instructs the source Observable which thread to emit items on, and this thread will push items all the way to the Subscriber . However, if it encounters an observeOn() somewhere in the chain (discussed shortly), it will then pass emissions to another thread for the remaining operations at that point.
Choosing a Scheduler
There are several other Schedulers such as Schedulers.io() , which is optimal for IO-related tasks (and it caches and re-uses threads to increase efficiency). Then there is Schedulers.newThread() which simply creates a new thread for each subscription. You have to be careful with both of these because in theory they could create an unlimited number of threads (this can cause bad performance). For computational tasks, you should use Schedulers.computation() so the number of threads are limited based on the number of cores your machine has.
You can also use Schedulers.from() to specify your own Executor. Especially for parallization, I found this approach to have better performance.
observeOn()
It is helpful to instruct a source Observable which Scheduler to use via subscribeOn() , and the source Observable will emit items on one of that Scheduler ‘s threads. However, it is often helpful in the middle of an Observable chain to switch to another Scheduler . For example, you may press a button on a UI and it kicks off work on a computation thread, which frees up the UI thread so the UI does not freeze. But when the computation is done, it needs to be displayed back on the UI. Oftentimes, when you working with UI technologies like JavaFX, Swing, or Android, you have to update the UI on the Event Dispatch Thread.
Take this example. We emit the numbers 1 through 10 and do some simple multiplication to them. By default the emissions happen on the main thread since we do not specify a subscribeOn() . But before the map(i -> i * 10) operation we switch the emissions over to a computation thread.
If you run this code you should get this output.
You will see the emissions initially occurred on the main thread and were pushed on that thread all the way to the first map() . But after that the observeOn() redirected the emissions to a computation thread, which pushed the emissions to the second map() and the final Subscriber .
Still not clear? Let’s look at this visually. No matter what Scheduler you are subscribed on, only one emission is allowed to travel up the Observable chain of operators at a time. Below, you can observe that the emission must be pushed all the way from the source to the Subscriber before the next emission can start.
Let’s say we wanted to switch to another thread after Operator 2. Perhaps we finished calculating something and now we want to update the UI on the UI thread. Or maybe we finished importing a data set on the io() Scheduler and now we want to do computations on the computation() Scheduler.
You can do this with the observeOn() operator as shown below. Notice how the bottom stream passed an emission to the top stream, allowing the bottom stream to start the next emission without waiting for the current one to reach the Subscriber .
The bottom stream represents a stream of operators on one Scheduler , and the top one represents another. Once an emission is passed from the bottom stream to the top one, the bottom stream is no longer concerned with it. It is now the top stream’s responsibility to get that emission to the Subscriber .
From what I understand, one problem that may arise with observeOn() is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure you may have to consider. I’m definitely not an authority on backpressure but I’ve been burned enough to be wary of it.
Effectively, you can only use one subscribeOn() , but you can have any number of observeOn() operators. You can switch emissions from one thread to another with ease using observeOn() . But do not use it everywhere for the sake of. Only use it when you find a calculation is intense enough that it needs to be offloaded to another thread.
For UI technologies, there are a couple of libraries that bridge RxJava with a UI Scheduler . For example, there is RxJavaFX which has a Scheduler that puts emissions on the JavaFX Platform thread. There is also the RxJava Android Module which has Schedulers for Android. There is even RxSwing for those of us stuck with legacy Swing applications. These are very helpful to use in conjunction with observeOn() when working with user interfaces.
Let me know if you have any questions or comments. Be sure to read about parallelization in my other article as well.
Источник