With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running
Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}
if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
RxJS: How to not subscribe to initial value and/or undefined?
Use new Rx.ReplaySubject(1)
instead of BehaviorSubject
.
How to ignore all except the last one of a series of quick Javascript events?
The classic way is to use a short timeout:
var cursorTimer;
function changeCursor() {
clearTimeout(cursorTimer);
cursorTimer = setTimeout(function() {
// process the actual cursor change here
}, 500);
}
Your regular code can continue calling changeCursor()
every time it changes (just like it does now), but the actual code inside the setTimeout()
will only execute when no cursor change events have occurred in the last 500ms. You can adjust that time value as desired.
The only way to know that events have stopped is to wait some short period of time and detect no further movement (which is what this does). It is common to use similar logic with scroll events.
Rx Subscribe() callbacks issue
The subscription to the observable sequence is asynchronous. Your while loop keeps going as fast as it can. Increments will outweigh the delayed decrements.
If you're using Rx v2.0 with .NET 4.5, you could use await to run your loop and proceed to the following iteration when the previous request completed:
while (true)
{
var obs = ...;
RequestCounter.Increment();
await obs.Select(...).Timeout(...);
RequestCounter.Decrement();
Console.WriteLine(...);
}
The bigger question is why you're trying to dispose the subscriptions if the requests completed? Rx cleans up after itself when a sequence reaches a terminal state (OnError, OnCompleted), so it seems you could simply drop the IDisposable objects on the floor and not worry about those.
How to ignore error and continue infinite stream?
mRestService.postLocations(locations)
emit one item, then complete.
If an error occur, then it emit the error, which complete the stream.
As you call this method in a flatMap
, the error continue to your "main" stream, and then your stream stops.
What you can do is to transform your error into another item (as described here : https://stackoverflow.com/a/28971140/476690 ), but not on your main stream (as I presume you already tried) but on the mRestService.postLocations(locations)
.
This way, this call will emit an error, that will be transformed to an item/another observable and then complete. (without calling onError
).
On a consumer view, mRestService.postLocations(locations)
will emit one item, then complete, like if everything succeed.
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Ignore incoming stream updates if last callback hasn't finished yet
If DoUpdate
is synchronous (which it appears to be in this case), you can use BufferIntrospective
from Rxx. It does exactly what you want:
IProduceDemUpdates
.BufferIntrospective()
.Where(items => items.Count > 0) // ignore empty buffers
.Select(items => items[items.Count - 1]) // ignore all but last item in buffer
.Subscribe(DoUpdate);
Related Topics
Fastest Way to Add New Node to End of an Xml
Is That Possible to Send Httpwebrequest Using Tls1.2 on .Net 4.0 Framework
Setting Dropdownlist Selecteditem Programmatically
How to Make 'Always-On-Bottom'-Window
C# Date Formatting Is Losing Slash Separators
How to Set a Character at an Index in a String in C#
Easiest Way to Parse "Querystring" Formatted Data
What's the Difference Between Double Quotes and Single Quote in C#
Conditional Datagridview Formatting
A Reproducible Example of Volatile Usage
How to Convert String "07:35" (Hh:Mm) to Timespan
Generating a Random Decimal in C#
How to Get Actual Path from Uri Xamarin Android
Openssl Encryption Using .Net Classes
C# - Making All Derived Classes Call the Base Class Constructor