How can I implement an exhaustMap handler in Rx.Net?
Here is an implementation of the ExhaustMap
operator. The source observable is projected to an IObservable<Task<TResult>>
, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged
operator, and finally the observable is flattened with the Concat
operator.
/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
{
return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
})
.DistinctUntilChanged()
.Concat();
async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
The tasks returned by the function
are not guaranteed to be distinct, hence the need for the HideIdentity
local function that returns distinct wrappers of the tasks.
Usage example:
Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Input: {x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"Result: {x}"))
.Wait();
Output:
Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9
Update: Here is an alternative implementation, where the function
produces an IObservable<TResult>
instead of a Task<TResult>
:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}
Rx.NET - Set capacity and drop oldest
The Rx observables don't have the property of "being full". An observable sequence is not a storage of messages, like a Queue<T>
or a Channel<T>
is. It's just a generator/propagator of messages. Some Rx operators have internal queues in order to perform their work, like the Concat
and the Zip
operators for example. Generally these queues are hidden, and cannot be configured to be "lossy".
An Rx component that might have the functionality that you are looking for is the ReplaySubject<T>
. This component can be configured with the maximum number of messages that it can replay (int bufferSize
), and with the maximum duration that it can store each message before discarding it (TimeSpan window
). If you set the bufferSize
but not the window
, the ReplaySubject<T>
will eventually buffer the specified number of items, and then the buffer will retain the same size forever. Each incoming message will cause the oldest buffered message to be dropped. A ReplaySubject<T>
is not a consumable queue like the Channel<T>
. It is always ready to propagate all the messages in its buffer, to any new subscribers that might come by in the future.
The ReplaySubject<T>
is used as propagator by the Replay
operator, similarly to how the Publish
operator is backed internally by a Subject<T>
.
Reactive Extensions - Switch, but wait until previous observable completes instead cancelling
MergeBounded(1,1) mentioned by @TheodorZoulias works, but I it seems a little bit too complex.
I have written a simpler alternative, but I'm looking for most appropriate name
/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
IObservable<T> latest = default;
return source
.Select(inner =>
{
latest = inner;
return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
})
.ConcatExhaust();
});
}
The following test:
var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
.Take(5)
.Do(val => Console.WriteLine($"Source: {val}"));
source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
.ConcatExhaust()
.Wait();
returns:
Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4
How to implement a better Finally Rx operator?
Here is an implementation of the FinallySafe
operator, having the behavior specified in the question:
/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
Action finallyAction)
{
return Observable.Create<T>(observer =>
{
var finallyOnce = Disposable.Create(finallyAction);
var subscription = source.Subscribe(observer.OnNext, error =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnError(error);
}, () =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnCompleted();
});
return new CompositeDisposable(subscription, finallyOnce);
});
}
The finallyAction
is assigned as the Dispose
action of a Disposable.Create
disposable instance, in order to ensure that the action will be invoked at most once. This disposable is then combined with the disposable subscription of the source
, by using a CompositeDisposable
instance.
As a side note, I would like to address the question if we could go even further, and propagate downstream a possible error of the finallyAction
during the unsubscription. This could be desirable in some cases, but unfortunately it's not possible. First and foremost doing so would violate a guideline, found in The Observable Contract document, that states:
When an observer issues an Unsubscribe notification to an Observable, the Observable will attempt to stop issuing notifications to the observer. It is not guaranteed, however, that the Observable will issue no notifications to the observer after an observer issues it an Unsubscribe notification.
So such an implementation would be non-conforming. Even worse, the Observable.Create
method enforces this guideline, by muting the observer
immediately after the subscription is disposed. It does so by encapsulating the observer inside an AutoDetachObserver
wrapper. And even if we tried to circumvent this limitation by implementing an IObservable<T>
type from scratch, any built-in operator that could be attached after our non-conforming Finally
operator would mute our post-unsubscription OnError
notification anyway. So it's just not possible. An error during the unsubscription cannot be propagated to the subscriber that just requested to unsubscribe.
Rx.net implement retry functionality on disconnect/error in observable
If you control ITransportService
, I would recommend adding a property:
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}
Once you can get the states in an observable fashion, producing the observable becomes easier:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
If you don't control ITransportService
, I would recommend creating an interface that inherits from it where you can add a similar property.
As an aside, I would recommend you ditch FooObserver
, you almost never need to fashion your own observer. Expose the observable, and calling a Subscribe
overload on the Observable normally does the trick.
I can't test any of this though: there's no clarity as to what the retry logic should be like, what the return value for Connect
means, or what the ConnectionState
class is, and the code doesn't compile. You should try to fashion your question as a mcve.
UPDATE:
The following handles the test code as expected:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
Only change from the original posted code is the additional .Catch(Observable.Never<FooData>())
. As written, this code will run forever. I hope you have some way to terminate the observable external to what's posted.
Related Topics
How to Pinvoke to Getwindowlongptr and Setwindowlongptr on 32-Bit Platforms
How to Draw a Rounded Rectangle as the Border for a Rounded Form
What Is Func, How and When Is It Used
How to Correctly Prefix a Word with "A" and "An"
Suppress Warning Cs1998: This Async Method Lacks 'Await'
How to Prevent Windows from Entering Idle State
How to Not Serialize the _Type Property on JSON Objects
Count the Number of Times a String Appears Within a String
How to Return a Reference to a Variable in C#
How to Interrupt Console.Readline
The Calling Thread Must Be Sta, Because Many UI Components Require This in Wpf
Add Shape Information to a Listview When Its Created
Entity Framework 6: Audit/Track Changes
C# Entity-Framework: How to Combine a .Find and .Include on a Model Object
.Contains() on a List of Custom Class Objects
Proper JSON Serialization in MVC 4