Observeon and Subscribeon - Where the Work Is Being Done

ObserveOn and SubscribeOn - where the work is being done

There's a lot of misleading info out there about SubscribeOn and ObserveOn.

Summary

  • SubscribeOn intercepts calls to the single method of IObservable<T>, which is Subscribe, and calls to Dispose on the IDisposable handle returned by Subscribe.
  • ObserveOn intercepts calls to the methods of IObserver<T>, which are OnNext, OnCompleted & OnError.
  • Both methods cause the respective calls to be made on the specified scheduler.

Analysis & Demonstrations

The statement

ObserveOn sets where the code in the Subscribe handler is
executed:

is more confusing than helpful. What you are referring to as the "Subscribe handler" is really an OnNext handler. Remember, the Subscribe method of IObservable accepts an IObserver that has OnNext, OnCompleted and OnError methods, but it is extension methods that provide the convenience overloads that accept lambdas and build an IObserver implementation for you.

Let me appropriate the term though; I think of the "Subscribe handler" being the code in the observable that is invoked when Subscribe is called. In this way, the description above more closely resembles the purpose of SubscribeOn.

SubscribeOn

SubscribeOn causes the Subscribe method of an observable to be executed asynchronously on the specified scheduler or context. You use it when you don't want to call the Subscribe method on an observable from whatever thread you are running on - typically because it can be long-running and you don't want to block the calling thread.

When you call Subscribe, you are calling an observable that may be part of a long chain of observables. It's only the observable that SubscribeOn is applied to that it effects. Now it may be the case that all the observables in the chain will be subscribed to immediately and on the same thread - but it doesn't have to be the case. Think about Concat for example - that only subscribes to each successive stream once the preceding stream has finished, and typically this will take place on whatever thread the preceding stream called OnCompleted from.

So SubscribeOn sits between your call to Subscribe and the observable you are subscribing to, intercepting the call and making it asynchronous.

It also affects disposal of subscriptions. Subscribe returns an IDisposable handle which is used to unsubscribe. SubscribeOn ensures calls to Dispose are scheduled on the supplied scheduler.

A common point of confusion when trying to understand what SubscribeOn does is that the Subscribe handler of an observable may well call OnNext, OnCompleted or OnError on this same thread. However, its purpose is not to affect these calls. It's not uncommon for a stream to complete before the Subscribe method returns. Observable.Return does this, for example. Let's take a look.

If you use the Spy method I wrote, and run the following code:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

You get this output (thread id may vary of course):

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

You can see that the entire subscription handler ran on the same thread, and finished before returning.

Let's use SubscribeOn to run this asynchronously. We will Spy on both the Return observable and the SubscribeOn observable:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

This outputs (line numbers added by me):

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.

01 - The main method is running on thread 1.

02 - the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - the SubscribeOn observable is evaluated on the calling thread.

04 - Now finally we call the Subscribe method of SubscribeOn.

05 - The Subscribe method completes asynchronously...

06 - ... and thread 1 returns to the main method. This is the effect of SubscribeOn in action!

07 - Meanwhile, SubscribeOn scheduled a call on the default scheduler to Return. Here it is received on thread 2.

08 - And as Return does, it calls OnNext on the Subscribe thread...

09 - and SubscribeOn is just a pass through now.

10,11 - Same for OnCompleted

12 - And last of all the Return subscription handler is done.

Hopefully that clears up the purpose and effect of SubscribeOn!

ObserveOn

If you think of SubscribeOn as an interceptor for the Subscribe method that passes the call on to a different thread, then ObserveOn does the same job, but for the OnNext, OnCompleted and OnError calls.

Recall our original example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

Which gave this output:

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

Now lets alter this to use ObserveOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

We get the following output:

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2

01 - The main method is running on Thread 1.

02 - As before, the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - The ObserveOn observable is evaluated on the calling thread too.

04 - Now we subscribe, again on the calling thread, first to the ObserveOn observable...

05 - ... which then passes the call through to the Return observable.

06 - Now Return calls OnNext in its Subscribe handler.

07 - Here is the effect of ObserveOn. We can see that the OnNext is scheduled asynchronously on Thread 2.

08 - Meanwhile Return calls OnCompleted on Thread 1...

09 - And Return's subscription handler completes...

10 - and then so does ObserveOn's subscription handler...

11 - so control is returned to the main method

12 - Meanwhile, ObserveOn has shuttled Return's OnCompleted call this over to Thread 2. This could have happened at any time during 09-11 because it is running asynchronously. Just so happens it's finally called now.

What are the typical use cases?

You will most often see SubscribeOn used in a GUI when you need to Subscribe to a long running observable and want to get off the dispatcher thread as soon as possible - maybe because you know it's one of those observables that does all it's work in the subscription handler. Apply it at the end of the observable chain, because this is the first observable called when you subscribe.

You will most often see ObserveOn used in a GUI when you want to ensure OnNext, OnCompleted and OnError calls are marshalled back to the dispatcher thread. Apply it at the end of the observable chain to transition back as late as possible.

Hopefully you can see that the answer to your question is that ObserveOnDispatcher won't make any difference to the threads that Where and SelectMany are executed on - it all depends what thread stream is calling them from! stream's subscription handler will be invoked on the calling thread, but it's impossible to say where Where and SelectMany will run without knowing how stream is implemented.

Observables with lifetimes that outlive the Subscribe call

Up until now, we've been looking exclusively at Observable.Return. Return completes its stream within the Subscribe handler. That's not atypical, but it's equally common for streams to outlive the Subscribe handler. Look at Observable.Timer for example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");

This returns the following:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2

You can clearly see the subscription to complete and then OnNext and OnCompleted being called later on a different thread.

Note that no combination of SubscribeOn or ObserveOn will have any effect whatsoever on which thread or scheduler Timer choses to invoke OnNext and OnCompleted on.

Sure, you can use SubscribeOn to determine the Subscribe thread:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

(I am deliberately changing to the NewThreadScheduler here to prevent confusion in the case of Timer happening to get the same thread pool thread as SubscribeOn)

Giving:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3

Here you can clearly see the main thread on thread (1) returning after its Subscribe calls, but the Timer subscription getting its own thread (2), but the OnNext and OnCompleted calls running on thread (3).

Now for ObserveOn, let's change the code to (for those following along in code, use nuget package rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

This code is a little different. The first line ensures we have a dispatcher, and we also bring in ObserveOnDispatcher - this is just like ObserveOn, except it specifies we should use the DispatcherScheduler of whatever thread ObserveOnDispatcher is evaluated on.

This code gives the following output:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1

Note that the dispatcher (and main thread) are thread 1. Timer is still calling OnNext and OnCompleted on the thread of its choosing (2) - but the ObserveOnDispatcher is marshalling calls back onto the dispatcher thread, thread (1).

Also note that if we were to block the dispatcher thread (say by a Thread.Sleep) you would see that the ObserveOnDispatcher would block (this code works best inside a LINQPad main method):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");

And you'll see output like this:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1

With the calls through the ObserveOnDispatcher only able to get out once the Sleep has run.

Key points

It's useful to keep in mind that Reactive Extensions is essentially a free-threaded library, and tries to be as lazy as possible about what thread it runs on - you have to deliberately interfere with ObserveOn, SubscribeOn and passing specific schedulers to operators that accept them to change this.

There's nothing a consumer of an observable can do to control what it's doing internally - ObserveOn and SubscribeOn are decorators that wrap the surface area of observers and observables to marshal calls across threads. Hopefully these examples have made that clear.

What's the difference between SubscribeOn and ObserveOn

I had a similar problem a while back and asked this question about it. I think the responses (including the comments) there will answer your question. To summarize:

  • If you want to update controls on a gui thread, use ObserveOn. If you reference System.Reactive.Windows.Forms.dll you get the .ObserveOn(form) which is handy.
  • SubscribeOn controls the thread on which the actual call to subscribe happens. The problem solved here is that WinForms and WPF will throw exceptions if you add event handlers from multiple different threads.

Also, this post was very helpful in figuring out the relationship between ObserveOn and SubscribeOn.

RxJava: subscribeOn and observeOn not working as expected

All "modifications" performed by operators are immutable, meaning that they return a new stream that receives notifications in an altered manner from the previous one. Since you just called subscribeOn and observeOn operators and didn't store their result, the subscription made later is on the unaltered stream.

One side note: I didn't quite understand your definition of subscribeOn behavior. If you meant that map operators are somehow affected by it, this is not true. subscribeOn defines a Scheduler, on which the OnSubscribe function is called. In your case the function you pass to the create() method. On the other hand, observeOn defines the Scheduler on which each successive stream (streams returned by applied operators) is handling emissions coming from an upstream.

SubscribeOn and observeOn in the main thread

The problem with your approach is that you can't know how many tasks should be executed at a given time and also not deadlock on waiting for tasks that should happen after you unblock the main thread.

Returning to the Java main thread is not supported by any extension to 1.x I know. For 2.x, there is the BlockingScheduler from the extensions project that allows you to do that:

public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();

scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});

System.out.println("BlockingScheduler finished");
}

Note the call to scheduler.shutdown() which has to be called eventually to release the main thread, otherwise your program may never terminate.

RxJava2 .subscribeOn .observeOn confusion. Running on Main thread

The order in which you put observeOn() and sunbscribeOn() , and other operators is very important.

  • subscribeOn() operator tells the source Observable which thread to emit and transform items on.

  • Be careful where you put the observeOn() operator because it changes
    the thread performing the work! In most cases you probably want to delay
    switching to the observing thread until the very end of your Rx chain.

    Observable.just("long", "longer", "longest")
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .map(String::length)
    .filter(length -> length == 6)
    .subscribe(length -> System.out.println("item length " + length));

here the map, filter and consuming is performed in the main thread

  • observeOn() before map()
    There is no reason to have observeOn() operator applied above the map() operator.
    In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread.

  • you can also use multiple observeOn() to switch threads like this example.

     Observable.just("long", "longer", "longest")
    .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.computation())
    .map(String::toString)
    .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.io())
    .map(String::toString)
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

OUTPUT :

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

Note
according to this answer subscribeOn() does't apply to the downstream operators, Therefore it does not guarantee that your operation is going to be on a different Thread.

subscribeOn effects go upstream and closer to the source of
events.

As for your problem I have made a test and here are the results

   private void testRxJava2Async() {
io.reactivex.Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {

Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});
}

Test Result :

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main

Rxandroid What's the difference between SubscribeOn and ObserveOn

In case you find the above answer full of jargons:

tl;dr

 Observable.just("Some string")                 
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)

Observe an observable... perform the map function in an IO thread (since we are "subscribingOn" that thread)... now switch to a Computation Thread and perform map(length -> 2 * length) function... and finally make sure you Observe the output on (observeOn()) Main thread.

Anyway,

observeOn() simply changes the thread of all operators further Downstream. People usually have this misconception that observeOn also acts as upstream, but it doesn't.

The below example will explain it better...

Observable.just("Some string")                  // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---) // Computation

subscribeOn() only influences the thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

Observable.just("Some String")              // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number)); // Computation

Position does not matter (subscribeOn())

Why?
Because it affects only the time of subscription.

Methods that obey the contact with subscribeOn

-> Basic example : Observable.create

All the work specified inside the create body will run on the thread specified in subscribeOn.

Another example: Observable.just,Observable.from or Observable.range

Note: All those methods accept values, so do not use blocking methods to create those values, as subscribeOn won't affect it.

If you want to use blocking functions, use

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

subscribeOn does not work with Subjects

Multiple subscribeOn:

If there are multiple instances of subscribeOn in the stream, only the first one has a practical effect.

Subscribe & subscribeOn

People think that subscribeOn has something to do with Observable.subscribe, but it doesn't have anything special to do with it.
It only affects the subscription phase.

Source : Tomek Polański (Medium)

Should subscribeOn and observeOn only be invoked by the final subscriber?

Great to see you have read the book and are taking the time to challenge some of the guidance there.

The reason I give this guidance is because

  1. Concurrency is hard, and having a simple rule helps teams produce better code
  2. Concurrency is hard, and having a single place to locate your concurrency concerns allows for a better mental model of your stack/layering and should simplify testing. The more layers that introduce concurrency in your application, the worse
  3. Blocking the UI thread is not good news. Getting off the UI-Thread asap, and then delaying any processing of data back on the UI as late as possible is preferable. This pattern aims to serve that goal.

These are obviously my opinions but I have seen these simple guidelines help clean up code on dozens of projects, reduce code bases, improve test ability, improve predictability and in numerous case massively improve performance.

Sadly, it is difficult to put together case studies of these projects as most of them are protected by NDAs.

I would be keen to see how it works for you or how you apply an alternate pattern.

Why should I subscribe on the main thread?

subscribeOn vs. observeOn

subscribeOn will call create method of Observable on given Scheduler. It does not matter how many times you use subscribeOn. The first subscribeOn to source-observable (first in chain) always wins.

observeOn will switch thread from operator to operator. When upstream emits a value on Thread X it will be switched over to Thread Y from given Scheduler in observeOn-Operator. Everything below observeOn will be processed in Thread Y now.

Best guess on provided example 1:
Using subscribeOn will call Observable#create on Schedulers#io. Everything in the create-lambda will be called on this thread from Schedulers#io. The listener-callback (onTextChanged) can actually happen on another thread. In this case it is the UI-Thread, because it is some kind of UI element. Now onNext will be called from UI-Thread (emitter.onNext(it)). The value will be emitted to #map operator on UI-Thread (.map { cheeseSearchEngine.search(it) }) and cheeseSearchEngine#search will block the UI-Thread.

Example2:
Uses as first operator ".subscribeOn(AndroidSchedulers.mainThread())". This has actually no effect, because you are already in the UI-Thread. In this case the create-lambda will be called from AndroidSchedulers#mainThread. The onNext will be emitted on the UI-Thread as-well, just in Example1, because the UI triggers the onTextChanged-Event. The value will then be put through observeOn(Schedulers.io()). Everything from the observeOn-point will be executed on an Schedulers#io-Thread. This will in turn not block the ui, when map does some HTTP-request (or some sort of long running IO). After map is finished and emits the next value downstreams the next observeOn(AndroidSchedulers.mainThread()) will switch back to the UI-Thread. Therefore you can now saftly change the UI in the subscribe-lambda, because you are on the UI-Thread.
As a conclusion the first subscribeOn in Example2 can be omitted, if it does not matter from which Thread the listener-registration is happening (listener-reg must probably be thread-safe).

Summary:
Using subscribeOn will only invoke the create lambda on given Scheduler-Thread. The callback from registered listener in create may happen on another Thread.
This is why Example1 will block the UI-Thread and Example2 will not.



Related Topics



Leave a reply



Submit