What Are the Default Schedulers for Each Observable Operator

What are the default Schedulers for each observable operator?

Wow, that was not trivial to find...

Deep within the bowels of the System.Reactive.Concurrency namespace, there is an internal static class called SchedulerDefaults, which is declared as:

internal static class SchedulerDefaults
{
internal static IScheduler AsyncConversions
{ get { return DefaultScheduler.Instance; }}

internal static IScheduler ConstantTimeOperations
{ get { return ImmediateScheduler.Instance; }}

internal static IScheduler Iteration
{ get { return CurrentThreadScheduler.Instance; }}

internal static IScheduler TailRecursion
{ get { return ImmediateScheduler.Instance; }}

internal static IScheduler TimeBasedOperations
{ get { return DefaultScheduler.Instance; }}
}

AsyncConversions is used by:

Start, ToAsync, FromAsyncPattern

ConstantTimeOperations is used by:

Empty, GetSchedulerForCurrentContext, Return, StartWith, Throw

Iteration is used by:

Generate, Range, Repeat, TakeLast, ToObservable, and the ReplaySubject<T>

TailRecursion is used by:

Run

TimeBasedOperations is used by:

Buffer, Delay, DelaySubscription, Generate, Interval, Sample, Skip, SkipLast
SkipUntil, Take, TakeLast, TakeLastBuffer, TakeUntil, Throttle, TimeInterval,
Timeout, Timer, Timestamp, Window

Choosing 'default' Schedulers for my Observable extension methods

Summary

In general, you should provide scheduler overloads if your operators schedule future events (e.g. call Schedule with a due time, or wait for some further event to respond) or if they produce events iteratively.

If the source stream is blocking in nature, that's really the business of the source stream. Whether or not you resolve the blocking depends on your extension - but you shouldn't turn a non-blocking stream into a blocking stream without providing an option (via a scheduler parameter) to avoid this.

Details

The question you cited is a great place to start. To recap, you can see the framework classifies it's operators into 5 classes:

  • AsyncConversions
  • ConstantTimeOperations
  • Iteration
  • TailRecursion
  • TimeBasedOperations

However, I think a tweak to these categories makes them more useful for this discussion:

  • Produces Future Events (e.g. Buffer, Window, Delay)
  • Iterates (e.g. Range, Generate, Repeat)
  • Other

Produces Future Events

The operators provided by the Rx library that fall into this category will always allow you to specify a scheduler. If you don't, they will use Scheduler.Default which will use the platform default scheduler which always has non-blocking semantics. Essentially these are operators that will schedule events at some future point. To be clear, by future point I mean that they will react to an incoming event by sending an event at some point in the future and/or by calling Schedule specifying a due time.

Because of this, you'll almost never want them to block, so it makes sense to use a scheduler that will run on a different thread (or whatever the platform equivalent of that is).

Iterates

Iterating operators are those that will produce a number of events - that may or may not be future scheduled. For example, the Range operator schedules it's events for immediate dispatch. However, a naïve implementation that pushed the whole range onto the observer immediately would be problematic - since it takes some non-zero amount of time to produce a range you don't want to remove the option of scheduling the range to some specified scheduler, especially in the case of large ranges. Furthermore, you want to be careful exactly how you push events onto the scheduler - if you dumped the whole range in a single loop in one scheduled action, you could unfairly deprive access to the scheduler with nasty consequences for time-critical scheduled actions using that scheduler (especially if it is single threaded).

So iterating operators like Range, are implemented so that each iteration is responsible for scheduling the next iteration. This means that their behavior can vary a lot depending on the scheduler they use - with an immediate scheduler you would see blocking, with the dispatcher scheduler you wouldn't see blocking or starvation because events would be interleaved with other dispatch operations.

The Other Category

The remaining category is for those operators transform or produce events immediately - and therefore they are non-blocking in themselves. These operators will often not provide an overload to allow you to specify a scheduler - they will just produce events using the thread they are called on - which can still vary over their lifetime and will depend on the stream they are applied to. If they originate events in constant time (like Observable.Return), they will usually complete before the Subscribe call returns.

How to decide

So to answer your question, you really need to consider the entire operator chain to make appropriate decisions. Each link in the chain can potentially introduce future or iterative scheduling. Furthermore, IObservable<T> does not reveal how individual operators behave -
whether or not a given operator includes overloads with schedule parameters is a good clue (and quite reliable for the built-in operators), but no guarantee.

There is good advice in the guidelines (an essential read) to help you decide whether to use or provide a scheduler overload. Here are the relevant extracts:

5.4. Consider passing a specific scheduler to concurrency introducing operators
Rather than using the ObserveOn operator to change the execution context on which the observable sequence produces messages, it is better to create concurrency in the right place to begin with. As operators parameterize introduction of concurrency by providing a scheduler argument overload, passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used.

[...]

When to ignore this guideline
When combining several events that originate on different execution contexts, use guideline 5.5 to put all messages on a specific execution context as late as possible.

5.5. Call the ObserveOn operator as late and in as few places as possible
By using the ObserveOn operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. Placing this operator later in the query will reduce both concerns.

[...]

When to ignore this guideline
Ignore this guideline if your use of the observable sequence is not bound to a specific execution context. In that case do not use the ObserveOn operator.

6.12. Avoid introducing concurrency
By adding concurrency, we change the timeliness of an observable sequence. Messages will be scheduled to arrive later. The time it takes to deliver a message is data itself, by adding concurrency we skew that data.

[...]

When to ignore this guideline
Ignore this guideline in situations where introduction of concurrency is an essential part of what the operator does.
NOTE: When we use the Immediate scheduler or call the observer directly from within the call to Subscribe, we make the Subscribe call blocking. Any expensive computation in this situation would indicate a candidate for introducing concurrency.

6.14. Operators should not block
Rx is a library for composing asynchronous and event-based programs using observable collections.
By making an operator blocking we lose these asynchronous characteristics. We also potentially loose composability (e.g. by returning a value typed as T instead of IObservable).

As a rule of thumb, I provide scheduler options in my overloads if and only if I use a scheduler in the implementation - either because I directly use one or because I call an operator that has a scheduler overload. I always use the scheduler overloads if they are available. This is so I can pass a TestScheduler (in nuget packing rx-testing) for unit testing purposes.

It's always worth using something like my Spy method to determine whether the Subscribe call returns immediately and the OnXXX methods of the observer are called on a different thread, or if the operator completes immediately in constant time running events on the calling thread and returning from the Subscribe call almost immediately (as in Return) - if that's the case, the given configuration is non-blocking and Scheduler.Default is not required.

For your specific examples the source stream is the critical factor.

In both examples you can block by providing a current thread scheduler. However, whether or not you will block depends on whether the source stream's Subscribe is blocking or non-blocking, and on what scheduler it schedule's it's events.

It also depending on what you mean by "sticking" - if you are are talking about Subscribe sticking only, then you only need check (working from the last operator applied backwards) if a blocking Subscribe appears before a non-blocking one. If you are talking about mid-stream events causing blocking, you'll need to think about how those operators work and:

In general, you should provide scheduler overloads if your operators schedule future events (e.g. call Schedule with a due time, or wait for some further event to respond) or if they produce events iteratively.

If the source stream is blocking in nature, that's really the business of the source stream. Whether or not you resolve the blocking depends on your extension - but you shouldn't turn a non-blocking stream into a blocking stream without providing an option (via a scheduler parameter) to avoid this.

Cold observable's Scheduler.CurrentThread - in which thread will it run?

Why not do run some tests to find out?

Here's what I did:

Console.WriteLine(Thread.CurrentThread.ManagedThreadId);

Observable
.Timer(TimeSpan.FromSeconds(2.0), Scheduler.CurrentThread)
.Subscribe(x => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

The produced:


12
12

Then I tried this:

Console.WriteLine(Thread.CurrentThread.ManagedThreadId);

Observable
.Timer(TimeSpan.FromSeconds(2.0))
.Subscribe(x => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

That produced:


12
13
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);

Observable
.Timer(TimeSpan.FromSeconds(2.0))
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(x => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

That produced:


11
27

All of this boils down to Scheduler.CurrentThread capturing the current thread context at the time that the Scheduler.CurrentThread parameter is evaluated.

In the first block of code it was captured at the time that the timer was created - in other words, my console thread.

In the final block it was captured after the timer fired so it captured the thread that the timer fired on.

Rx when debugging how do i know which scheduler is being used?

By default, the operators pick a scheduler that introduces the least concurrency required for the operator.

If you want to know what is being used in the debugger, then put a break point in your observer and look at the stack trace.



Related Topics



Leave a reply



Submit