With Rx, How to Ignore All-Except-The-Latest Value When My Subscribe Method Is Running

With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running

Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}

if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}

RxJS: How to not subscribe to initial value and/or undefined?

Use new Rx.ReplaySubject(1) instead of BehaviorSubject.

How to ignore all except the last one of a series of quick Javascript events?

The classic way is to use a short timeout:

var cursorTimer;
function changeCursor() {
clearTimeout(cursorTimer);
cursorTimer = setTimeout(function() {
// process the actual cursor change here
}, 500);

}

Your regular code can continue calling changeCursor() every time it changes (just like it does now), but the actual code inside the setTimeout() will only execute when no cursor change events have occurred in the last 500ms. You can adjust that time value as desired.

The only way to know that events have stopped is to wait some short period of time and detect no further movement (which is what this does). It is common to use similar logic with scroll events.

Rx Subscribe() callbacks issue

The subscription to the observable sequence is asynchronous. Your while loop keeps going as fast as it can. Increments will outweigh the delayed decrements.

If you're using Rx v2.0 with .NET 4.5, you could use await to run your loop and proceed to the following iteration when the previous request completed:

while (true)
{
var obs = ...;

RequestCounter.Increment();

await obs.Select(...).Timeout(...);

RequestCounter.Decrement();

Console.WriteLine(...);
}

The bigger question is why you're trying to dispose the subscriptions if the requests completed? Rx cleans up after itself when a sequence reaches a terminal state (OnError, OnCompleted), so it seems you could simply drop the IDisposable objects on the floor and not worry about those.

How to ignore error and continue infinite stream?

mRestService.postLocations(locations) emit one item, then complete.
If an error occur, then it emit the error, which complete the stream.

As you call this method in a flatMap, the error continue to your "main" stream, and then your stream stops.

What you can do is to transform your error into another item (as described here : https://stackoverflow.com/a/28971140/476690 ), but not on your main stream (as I presume you already tried) but on the mRestService.postLocations(locations).

This way, this call will emit an error, that will be transformed to an item/another observable and then complete. (without calling onError).

On a consumer view, mRestService.postLocations(locations) will emit one item, then complete, like if everything succeed.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

Ignore incoming stream updates if last callback hasn't finished yet

If DoUpdate is synchronous (which it appears to be in this case), you can use BufferIntrospective from Rxx. It does exactly what you want:

IProduceDemUpdates
.BufferIntrospective()
.Where(items => items.Count > 0) // ignore empty buffers
.Select(items => items[items.Count - 1]) // ignore all but last item in buffer
.Subscribe(DoUpdate);


Related Topics



Leave a reply



Submit