Does the Order of Subscribeon and Observeon Matter

Does the order of subscribeOn and observeOn matter?

Where you call subscribeOn() in a chain doesn't really matter. Where you call observeOn() does matter.

subscribeOn() tells the whole chain which thread to start processing on. You should only call it once per chain. If you call it again lower down the stream it will have no effect.

observeOn() causes all operations which happen below it to be executed on the specified scheduler. You can call it multiple times per stream to move between different threads.

Take the following example:

doSomethingRx()
.subscribeOn(BackgroundScheduler)
.doAnotherThing()
.observeOn(ComputationScheduler)
.doSomethingElse()
.observeOn(MainScheduler)
.subscribe(//...)
  • The subscribeOn causes doSomethingRx to be called on the BackgroundScheduler.
  • doAnotherThing will continue on BackgroundScheduler
  • then observeOn switches the stream to the ComputationScheduler
  • doSomethingElse will happen on the ComputationScheduler
  • another observeOn switches the stream to the MainScheduler
  • subscribe happens on the MainScheduler

Rxandroid What's the difference between SubscribeOn and ObserveOn

In case you find the above answer full of jargons:

tl;dr

 Observable.just("Some string")                 
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)

Observe an observable... perform the map function in an IO thread (since we are "subscribingOn" that thread)... now switch to a Computation Thread and perform map(length -> 2 * length) function... and finally make sure you Observe the output on (observeOn()) Main thread.

Anyway,

observeOn() simply changes the thread of all operators further Downstream. People usually have this misconception that observeOn also acts as upstream, but it doesn't.

The below example will explain it better...

Observable.just("Some string")                  // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---) // Computation

subscribeOn() only influences the thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

Observable.just("Some String")              // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number)); // Computation

Position does not matter (subscribeOn())

Why?
Because it affects only the time of subscription.

Methods that obey the contact with subscribeOn

-> Basic example : Observable.create

All the work specified inside the create body will run on the thread specified in subscribeOn.

Another example: Observable.just,Observable.from or Observable.range

Note: All those methods accept values, so do not use blocking methods to create those values, as subscribeOn won't affect it.

If you want to use blocking functions, use

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

subscribeOn does not work with Subjects

Multiple subscribeOn:

If there are multiple instances of subscribeOn in the stream, only the first one has a practical effect.

Subscribe & subscribeOn

People think that subscribeOn has something to do with Observable.subscribe, but it doesn't have anything special to do with it.
It only affects the subscription phase.

Source : Tomek Polański (Medium)

RxSwift subscribeOn and observeOn not on the expected background Thread

There is no way to force a function to call its callback on a particular thread unless the library gives you the option to do it. Your successBlock and errorBlock closures will be called on whatever thread QBRequest.logIn wants to call them on and there's nothing you can do about that.

That said, the .observeOn(_:) operator will transfer the execution to a different thread so in your last code example, your onNext and onError closures will be executed on the self.scheduler thread.

Should subscribeOn and observeOn only be invoked by the final subscriber?

Great to see you have read the book and are taking the time to challenge some of the guidance there.

The reason I give this guidance is because

  1. Concurrency is hard, and having a simple rule helps teams produce better code
  2. Concurrency is hard, and having a single place to locate your concurrency concerns allows for a better mental model of your stack/layering and should simplify testing. The more layers that introduce concurrency in your application, the worse
  3. Blocking the UI thread is not good news. Getting off the UI-Thread asap, and then delaying any processing of data back on the UI as late as possible is preferable. This pattern aims to serve that goal.

These are obviously my opinions but I have seen these simple guidelines help clean up code on dozens of projects, reduce code bases, improve test ability, improve predictability and in numerous case massively improve performance.

Sadly, it is difficult to put together case studies of these projects as most of them are protected by NDAs.

I would be keen to see how it works for you or how you apply an alternate pattern.

RxJava multiple subscribeOn() in chain

It looks it wont have any effect. It doesn´t matter where you put the subscribeOn in the chain that will have the same effect.

As per documentation, observeOn has a different behaviour, so you can change the thread where you´re observing the result at any point in the chain.

From the Rx documentation:

the SubscribeOn operator designates which thread the Observable will
begin operating on, no matter at what point in the chain of operators
that operator is called. ObserveOn, on the other hand, affects the
thread that the Observable will use below where that operator appears.
For this reason, you may call ObserveOn multiple times at various
points during the chain of Observable operators in order to change on
which threads certain of those operators operate.

http://reactivex.io/documentation/operators/subscribeon.html

Dispose (cancel) observable. SubscribeOn and observeOn different schedulers

Interesting test case. There is a small bug, it should be if strongSelf.isCancelled instead of if !strongSelf.isCancelled. Apart from that, the test case shows the problem.

I would intuitively expect that it is checked whether a dispose has already taken place before emitting, if it happens on the same thread.

I found additionally this:

just to make this clear, if you call dispose on one thread (like
main), you won't observe any elements on that same thread. That is a
guarantee.

see here: https://github.com/ReactiveX/RxSwift/issues/38

So maybe it is a bug.

To be sure I opened an issue here:
https://github.com/ReactiveX/RxSwift/issues/1778

Update

It seems it was actually a bug. Meanwhile, the fine people at RxSwift have confirmed it and fortunately fixed it very quickly. See the issue link above.

Testing

The bug was fixed with commit bac86346087c7e267dd5a620eed90a7849fd54ff. So if you are using CocoaPods, you can simply use something like the following for testing:

target 'RxSelfContained' do
use_frameworks!
pod 'RxAtomic', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
pod 'RxSwift', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
end

Why should I subscribe on the main thread?

subscribeOn vs. observeOn

subscribeOn will call create method of Observable on given Scheduler. It does not matter how many times you use subscribeOn. The first subscribeOn to source-observable (first in chain) always wins.

observeOn will switch thread from operator to operator. When upstream emits a value on Thread X it will be switched over to Thread Y from given Scheduler in observeOn-Operator. Everything below observeOn will be processed in Thread Y now.

Best guess on provided example 1:
Using subscribeOn will call Observable#create on Schedulers#io. Everything in the create-lambda will be called on this thread from Schedulers#io. The listener-callback (onTextChanged) can actually happen on another thread. In this case it is the UI-Thread, because it is some kind of UI element. Now onNext will be called from UI-Thread (emitter.onNext(it)). The value will be emitted to #map operator on UI-Thread (.map { cheeseSearchEngine.search(it) }) and cheeseSearchEngine#search will block the UI-Thread.

Example2:
Uses as first operator ".subscribeOn(AndroidSchedulers.mainThread())". This has actually no effect, because you are already in the UI-Thread. In this case the create-lambda will be called from AndroidSchedulers#mainThread. The onNext will be emitted on the UI-Thread as-well, just in Example1, because the UI triggers the onTextChanged-Event. The value will then be put through observeOn(Schedulers.io()). Everything from the observeOn-point will be executed on an Schedulers#io-Thread. This will in turn not block the ui, when map does some HTTP-request (or some sort of long running IO). After map is finished and emits the next value downstreams the next observeOn(AndroidSchedulers.mainThread()) will switch back to the UI-Thread. Therefore you can now saftly change the UI in the subscribe-lambda, because you are on the UI-Thread.
As a conclusion the first subscribeOn in Example2 can be omitted, if it does not matter from which Thread the listener-registration is happening (listener-reg must probably be thread-safe).

Summary:
Using subscribeOn will only invoke the create lambda on given Scheduler-Thread. The callback from registered listener in create may happen on another Thread.
This is why Example1 will block the UI-Thread and Example2 will not.

Rxjava observeOn and subscribeOn in Retrofit

Basic, we will have

Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer

Example

requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable

From the http://reactivex.io/documentation/operators/subscribeon.html

SubscribeOn

  • SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called

ObserveOn (affect 2 things)

  • It instructs the Observable to send notifications to Observers on a specified Scheduler.

  • ObserveOn affects the thread that the Observable will use below where that operator appears

Example

registerUserReturnedObserverble()  // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.observeOn(AndroidSchedulers.mainThread())
.andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>{
// run on main thread because observeOn(AndroidSchedulers.mainThread())
});


Related Topics



Leave a reply



Submit