Write an Rx "Retryafter" Extension Method

Write an Rx RetryAfter extension method

The key to this implementation of a back off retry is deferred observables. A deferred observable won't execute its factory until someone subscribes to it. And it will invoke the factory for each subscription, making it ideal for our retry scenario.

Assume we have a method which triggers a network request.

public IObservable<WebResponse> SomeApiMethod() { ... }

For the purposes of this little snippet, let's define the deferred as source

var source = Observable.Defer(() => SomeApiMethod());

Whenever someone subscribes to source it will invoke SomeApiMethod and launch a new web request. The naive way to retry it whenever it fails would be using the built in Retry operator.

source.Retry(4)

That wouldn't be very nice to the API though and it's not what you're asking for. We need to delay the launching of requests in between each attempt. One way of doing that is with a delayed subscription.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

That's not ideal since it'll add the delay even on the first request, let's fix that.

int attempt = 0;
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

Just pausing for a second isn't a very good retry method though so let's change that constant to be a function which receives the retry count and returns an appropriate delay. Exponential back off is easy enough to implement.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))

We're almost done now, we just need to add a way of specifying for which exceptions we should retry. Let's add a function that given an exception returns whether or not it makes sense to retry, we'll call it retryOnError.

Now we need to write some scary looking code but bear with me.

Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
.Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
.Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
: Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3))

All of those angle brackets are there to marshal an exception for which we shouldn't retry past the .Retry(). We've made the inner observable be an IObservable<Tuple<bool, WebResponse, Exception>> where the first bool indicates if we have a response or an exception. If retryOnError indicates that we should retry for a particular exception the inner observable will throw and that will be picked up by the retry. The SelectMany just unwraps our Tuple and makes the resulting observable be IObservable<WebRequest> again.

See my gist with full source and tests for the final version. Having this operator allows us to write our retry code quite succinctly

Observable.Defer(() => SomApiMethod())
.RetryWithBackoffStrategy(
retryCount: 4,
retryOnError: e => e is ApiRetryWebException
)

How to write a generic, recursive extension method in F#?

You can use it as an extension method once you finish the Type declaration. So you can write the method like this:

[<Extension>]
type ObservableExtensions =

[<Extension>]
static member retryAfterDelay(source: IObservable<_>, retryDelay: TimeSpan, retryCount, scheduler: IScheduler): IObservable<_> =
ObservableExtensions.retryAfterDelay(source.Catch(fun ex -> source.DelaySubscription(retryDelay, scheduler)),retryDelay, retryCount - 1, scheduler)

After that you can immediately use it. If you need it in another extension from the same class you can use it by re-opening again the Type declaration:

type ObservableExtensions with
[<Extension>]
static member anotherExtension (x: IObservable<_>) = x.retryAfterDelay // now you can use it as an extension method

The alternative is using let and rec inside an internal function:

    [<Extension>]
static member retryAfterDelay(source: IObservable<_>, retryDelay: TimeSpan, retryCount, scheduler: IScheduler): IObservable<_> =
let rec go (source: IObservable<_>, retryDelay: TimeSpan, retryCount, scheduler: IScheduler): IObservable<_> =
go (source.Catch(fun ex -> source.DelaySubscription(retryDelay, scheduler)),retryDelay, retryCount - 1, scheduler)
go (source, retryDelay, retryCount, scheduler)

Which I prefer for F# since the recursion is explicit in the code.

Rx back off and retry

It's because you are putting a Delay on the result stream. (The value for n passed to ExponentialBackoff on the second iteration is 1, giving a delay of 1 second.)

Delay operates on source, but the source proceeds as normal. Delay schedules the results it receives to be emitted after the specified duration. So the subscriber is getting the results after the logic of Generate has run to completion.

If you think about it this is how Delay must be - otherwise Delay would be able to somehow interfere with upstream operators!

It is possible to interfere with upstream operators (without throwing exceptions), by being a slow consumer. But that would certainly be a very bad way for a simple Delay to behave.

I don't think the Delay is what you intend here - because Delay doesn't delay it's subscription. If you use DelaySubscription instead, you'll get what you're after I think. This is what's used in the linked question too.

Your question provides a great illustration of the difference between Delay and DelaySubscription! It's worth thinking about Defer in here too.

The distinction between these three is subtle but significant, so let's summarize all three:

  • Delay - Calls target operator immediately to get an IObservable, on its Subscribe calls Subscribe on target immediately, schedules events for delivery after specified delay on the specified Scheduler.

  • DelaySubscription - Calls target operator immediately to get an IObservable. On its Subscribe schedules Subscribe on target for execution after specified delay on the specified Scheduler.

  • Defer - Has no target operator. On Subscribe runs provided factory function to get target IObservable and immediately calls Subscribe. There's no delay added, hence no Scheduler to specify.

Use Rx Start, Retry, Delay, Wait for synchronous file delete retry

This won't work the way you want. There are three problems:

  • Delay doesn't work how you think - it delays passing on the events, but the source still runs immediately.
  • You are issuing the Retry before the Delay
  • You need to use Defer to create a factory because Start will only call the embedded function once on evaluation.

Have a look at this answer for more detail on Delay and why DelaySubscription is better: Rx back off and retry.

This answer has a good implementation of a back-off retry: Write an Rx "RetryAfter" extension method

A simple fix for your code could be this, which catches the exception and rethrows it after a delay - but there's no delay if it works:

Observable.Defer(() => Observable.Start(() => File.Delete(path)))
.Catch((Exception ex) =>
Observable.Throw<Unit>(ex)
.DelaySubscription(TimeSpan.FromMilliseconds(500)))
.Retry(2)
.Wait();

Do have a look at the second link above for a fuller and better implementation though.

I kept the code above simple to make the point and isn't perfect - it always delays the exception for example.

You really want to have the DelaySubscription on the action and have it's delay time be dynamically calculated depending on the number of retries, which is what the linked implementation will do.

Reactive Extensions and Retry

Yeah Rx is generally asynchronous so when writing tests, you need to wait for it to finish (otherwise Main just exits right after your call to Subscribe).

Also, make sure you subscribe to the observable produced by calling source.RetryWithBackoffStrategy(...). That produces a new observable that has the retry semantics.

Easiest solution in cases like this is to literally use Wait:

try
{
var source2 = source.RetryWithBackoffStrategy(/*...*/);

// blocks the current thread until the source finishes
var result = source2.Wait();
Console.WriteLine("result=" + result);
}
catch (Exception err)
{
Console.WriteLine("uh oh", err);
}

If you use something like NUnit (which supports asynchronous tests) to write your tests, then you can do:

[Test]
public async Task MyTest()
{
var source = // ...;
var source2 = source.RetryWithBackoffStrategy(/*...*/);
var result = await source2; // you can await observables
Assert.That(result, Is.EqualTo(5));
}

Stuck with Rx Observable SelectMany

As Brandon mentioned, there was no synchronization/blocking after defining observable's behavior. So I deal with it by replacing "Subscribe" call with "ForEachAsync", transforming that Observable to Task and blocking caller with Tasks's "Wait" method:

    files.ToObservable().SelectMany(f =>
{
var source = Observable.Defer(() => Observable.Start(() =>
{
ftpConnection.DownloadFile(avroPath, f.Name);
return Tuple.Create(true, f.Name);
}));
int attempt = 0;
return Observable.Defer(() => ((++attempt == 1)
? source
: source.DelaySubscription(TimeSpan.FromSeconds(1))))
.Retry(4)
.Catch(Observable.Return(Tuple.Create(false, f.Name)));
}).ForEachAsync(res =>
{
if (res.Item1) Process(res.Item2);
else LogOrQueueOrWhatever(res.Item2);
}).Wait();

ProcessLogs();
ScheduleNExtDownloadRoutine();

Translating a piece of asynchronous C# code to F# (with Reactive Extensions and FSharpx)

If you don't absolutely have to use Observable.Create, you can achieve similar results with Observable.Interval:

type Observable with
static member Poll(f : unit -> IObservable<_>, interval : TimeSpan, sched : IScheduler) : IObservable<_> =
Observable.Interval(interval, sched)
.SelectMany(fun _ ->
Observable.Defer(f)
.Select(Choice1Of2)
.Catch(Choice2Of2 >> Observable.Return))

// An overload that matches your original function
static member Poll(f : 'a -> IObservable<_>, argFactory : unit -> 'a, interval : TimeSpan, sched : IScheduler) =
Observable.Poll(argFactory >> f, interval, sched)

What I like about this implementation is that you don't have to go down to the level of directly using schedulers and Observable.Create. I think you should always use existing combinators/operators unless you absolutely have to do otherwise.

Also, Observable.Interval uses SchedulePeriodic (evident here) which is probably more efficient and correct than your Task-based implementation.

In RxJava, how to retry/resume on error, instead of completing the observable

You don't have to deal error with your mViewPeriodSubject but instead, deal with errors on your retrofit Observable. This retrofit Observable won't resume, but at least, it won't affect your "main" Observable.

mAdapterObservable =
mViewPeriodSubject
.flatMap(period -> MyRetrofitAPI.getService().fetchData(period).onErrorResumeNext(e -> Observable.empty()) // this might fail
.flatMap(Observable::from)
.map(MyItem::modifyItem)
.toList()
.map(obj -> new MyAdapter(obj));

Choosing 'default' Schedulers for my Observable extension methods

Summary

In general, you should provide scheduler overloads if your operators schedule future events (e.g. call Schedule with a due time, or wait for some further event to respond) or if they produce events iteratively.

If the source stream is blocking in nature, that's really the business of the source stream. Whether or not you resolve the blocking depends on your extension - but you shouldn't turn a non-blocking stream into a blocking stream without providing an option (via a scheduler parameter) to avoid this.

Details

The question you cited is a great place to start. To recap, you can see the framework classifies it's operators into 5 classes:

  • AsyncConversions
  • ConstantTimeOperations
  • Iteration
  • TailRecursion
  • TimeBasedOperations

However, I think a tweak to these categories makes them more useful for this discussion:

  • Produces Future Events (e.g. Buffer, Window, Delay)
  • Iterates (e.g. Range, Generate, Repeat)
  • Other

Produces Future Events

The operators provided by the Rx library that fall into this category will always allow you to specify a scheduler. If you don't, they will use Scheduler.Default which will use the platform default scheduler which always has non-blocking semantics. Essentially these are operators that will schedule events at some future point. To be clear, by future point I mean that they will react to an incoming event by sending an event at some point in the future and/or by calling Schedule specifying a due time.

Because of this, you'll almost never want them to block, so it makes sense to use a scheduler that will run on a different thread (or whatever the platform equivalent of that is).

Iterates

Iterating operators are those that will produce a number of events - that may or may not be future scheduled. For example, the Range operator schedules it's events for immediate dispatch. However, a naïve implementation that pushed the whole range onto the observer immediately would be problematic - since it takes some non-zero amount of time to produce a range you don't want to remove the option of scheduling the range to some specified scheduler, especially in the case of large ranges. Furthermore, you want to be careful exactly how you push events onto the scheduler - if you dumped the whole range in a single loop in one scheduled action, you could unfairly deprive access to the scheduler with nasty consequences for time-critical scheduled actions using that scheduler (especially if it is single threaded).

So iterating operators like Range, are implemented so that each iteration is responsible for scheduling the next iteration. This means that their behavior can vary a lot depending on the scheduler they use - with an immediate scheduler you would see blocking, with the dispatcher scheduler you wouldn't see blocking or starvation because events would be interleaved with other dispatch operations.

The Other Category

The remaining category is for those operators transform or produce events immediately - and therefore they are non-blocking in themselves. These operators will often not provide an overload to allow you to specify a scheduler - they will just produce events using the thread they are called on - which can still vary over their lifetime and will depend on the stream they are applied to. If they originate events in constant time (like Observable.Return), they will usually complete before the Subscribe call returns.

How to decide

So to answer your question, you really need to consider the entire operator chain to make appropriate decisions. Each link in the chain can potentially introduce future or iterative scheduling. Furthermore, IObservable<T> does not reveal how individual operators behave -
whether or not a given operator includes overloads with schedule parameters is a good clue (and quite reliable for the built-in operators), but no guarantee.

There is good advice in the guidelines (an essential read) to help you decide whether to use or provide a scheduler overload. Here are the relevant extracts:

5.4. Consider passing a specific scheduler to concurrency introducing operators
Rather than using the ObserveOn operator to change the execution context on which the observable sequence produces messages, it is better to create concurrency in the right place to begin with. As operators parameterize introduction of concurrency by providing a scheduler argument overload, passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used.

[...]

When to ignore this guideline
When combining several events that originate on different execution contexts, use guideline 5.5 to put all messages on a specific execution context as late as possible.

5.5. Call the ObserveOn operator as late and in as few places as possible
By using the ObserveOn operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. Placing this operator later in the query will reduce both concerns.

[...]

When to ignore this guideline
Ignore this guideline if your use of the observable sequence is not bound to a specific execution context. In that case do not use the ObserveOn operator.

6.12. Avoid introducing concurrency
By adding concurrency, we change the timeliness of an observable sequence. Messages will be scheduled to arrive later. The time it takes to deliver a message is data itself, by adding concurrency we skew that data.

[...]

When to ignore this guideline
Ignore this guideline in situations where introduction of concurrency is an essential part of what the operator does.
NOTE: When we use the Immediate scheduler or call the observer directly from within the call to Subscribe, we make the Subscribe call blocking. Any expensive computation in this situation would indicate a candidate for introducing concurrency.

6.14. Operators should not block
Rx is a library for composing asynchronous and event-based programs using observable collections.
By making an operator blocking we lose these asynchronous characteristics. We also potentially loose composability (e.g. by returning a value typed as T instead of IObservable).

As a rule of thumb, I provide scheduler options in my overloads if and only if I use a scheduler in the implementation - either because I directly use one or because I call an operator that has a scheduler overload. I always use the scheduler overloads if they are available. This is so I can pass a TestScheduler (in nuget packing rx-testing) for unit testing purposes.

It's always worth using something like my Spy method to determine whether the Subscribe call returns immediately and the OnXXX methods of the observer are called on a different thread, or if the operator completes immediately in constant time running events on the calling thread and returning from the Subscribe call almost immediately (as in Return) - if that's the case, the given configuration is non-blocking and Scheduler.Default is not required.

For your specific examples the source stream is the critical factor.

In both examples you can block by providing a current thread scheduler. However, whether or not you will block depends on whether the source stream's Subscribe is blocking or non-blocking, and on what scheduler it schedule's it's events.

It also depending on what you mean by "sticking" - if you are are talking about Subscribe sticking only, then you only need check (working from the last operator applied backwards) if a blocking Subscribe appears before a non-blocking one. If you are talking about mid-stream events causing blocking, you'll need to think about how those operators work and:

In general, you should provide scheduler overloads if your operators schedule future events (e.g. call Schedule with a due time, or wait for some further event to respond) or if they produce events iteratively.

If the source stream is blocking in nature, that's really the business of the source stream. Whether or not you resolve the blocking depends on your extension - but you shouldn't turn a non-blocking stream into a blocking stream without providing an option (via a scheduler parameter) to avoid this.



Related Topics



Leave a reply



Submit