How to Implement an Exhaustmap Handler in Rx.Net

How can I implement an exhaustMap handler in Rx.Net?

Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<Task<TResult>>, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the observable is flattened with the Concat operator.

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
{
return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
})
.DistinctUntilChanged()
.Concat();

async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}

The tasks returned by the function are not guaranteed to be distinct, hence the need for the HideIdentity local function that returns distinct wrappers of the tasks.

Usage example:

Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Input: {x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"Result: {x}"))
.Wait();

Output:

Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9

Update: Here is an alternative implementation, where the function produces an IObservable<TResult> instead of a Task<TResult>:

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}

Rx.NET - Set capacity and drop oldest

The Rx observables don't have the property of "being full". An observable sequence is not a storage of messages, like a Queue<T> or a Channel<T> is. It's just a generator/propagator of messages. Some Rx operators have internal queues in order to perform their work, like the Concat and the Zip operators for example. Generally these queues are hidden, and cannot be configured to be "lossy".

An Rx component that might have the functionality that you are looking for is the ReplaySubject<T>. This component can be configured with the maximum number of messages that it can replay (int bufferSize), and with the maximum duration that it can store each message before discarding it (TimeSpan window). If you set the bufferSize but not the window, the ReplaySubject<T> will eventually buffer the specified number of items, and then the buffer will retain the same size forever. Each incoming message will cause the oldest buffered message to be dropped. A ReplaySubject<T> is not a consumable queue like the Channel<T>. It is always ready to propagate all the messages in its buffer, to any new subscribers that might come by in the future.

The ReplaySubject<T> is used as propagator by the Replay operator, similarly to how the Publish operator is backed internally by a Subject<T>.

Reactive Extensions - Switch, but wait until previous observable completes instead cancelling

MergeBounded(1,1) mentioned by @TheodorZoulias works, but I it seems a little bit too complex.

I have written a simpler alternative, but I'm looking for most appropriate name

/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
IObservable<T> latest = default;
return source
.Select(inner =>
{
latest = inner;
return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
})
.ConcatExhaust();
});
}

The following test:

var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
.Take(5)
.Do(val => Console.WriteLine($"Source: {val}"));

source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
.ConcatExhaust()
.Wait();

returns:

Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4

How to implement a better Finally Rx operator?

Here is an implementation of the FinallySafe operator, having the behavior specified in the question:

/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
Action finallyAction)
{
return Observable.Create<T>(observer =>
{
var finallyOnce = Disposable.Create(finallyAction);
var subscription = source.Subscribe(observer.OnNext, error =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnError(error);
}, () =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnCompleted();
});
return new CompositeDisposable(subscription, finallyOnce);
});
}

The finallyAction is assigned as the Dispose action of a Disposable.Create disposable instance, in order to ensure that the action will be invoked at most once. This disposable is then combined with the disposable subscription of the source, by using a CompositeDisposable instance.

As a side note, I would like to address the question if we could go even further, and propagate downstream a possible error of the finallyAction during the unsubscription. This could be desirable in some cases, but unfortunately it's not possible. First and foremost doing so would violate a guideline, found in The Observable Contract document, that states:

When an observer issues an Unsubscribe notification to an Observable, the Observable will attempt to stop issuing notifications to the observer. It is not guaranteed, however, that the Observable will issue no notifications to the observer after an observer issues it an Unsubscribe notification.

So such an implementation would be non-conforming. Even worse, the Observable.Create method enforces this guideline, by muting the observer immediately after the subscription is disposed. It does so by encapsulating the observer inside an AutoDetachObserver wrapper. And even if we tried to circumvent this limitation by implementing an IObservable<T> type from scratch, any built-in operator that could be attached after our non-conforming Finally operator would mute our post-unsubscription OnError notification anyway. So it's just not possible. An error during the unsubscription cannot be propagated to the subscriber that just requested to unsubscribe.

Rx.net implement retry functionality on disconnect/error in observable

If you control ITransportService, I would recommend adding a property:

public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}

Once you can get the states in an observable fashion, producing the observable becomes easier:

public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}

public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}

If you don't control ITransportService, I would recommend creating an interface that inherits from it where you can add a similar property.

As an aside, I would recommend you ditch FooObserver, you almost never need to fashion your own observer. Expose the observable, and calling a Subscribe overload on the Observable normally does the trick.

I can't test any of this though: there's no clarity as to what the retry logic should be like, what the return value for Connect means, or what the ConnectionState class is, and the code doesn't compile. You should try to fashion your question as a mcve.


UPDATE:

The following handles the test code as expected:

public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}

Only change from the original posted code is the additional .Catch(Observable.Never<FooData>()). As written, this code will run forever. I hope you have some way to terminate the observable external to what's posted.



Related Topics



Leave a reply



Submit