Does reactive extensions support rolling buffers?
I wrote an extension to do most of what you're after - BufferWithInactivity
.
Here it is:
public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
{
return Observable.Create<IEnumerable<T>>(o =>
{
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;
Action dump = () =>
{
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
{
o.OnNext(bts);
}
};
Action dispose = () =>
{
if (subscription != null)
{
subscription.Dispose();
}
mutable.Dispose();
};
Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
{
lock (gate)
{
dispose();
dump();
if (o != null)
{
onAction(o);
}
}
};
Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));
Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());
Action<T> onNext = t =>
{
lock (gate)
{
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
{
dump();
mutable.Disposable = Disposable.Empty;
}
else
{
mutable.Disposable = scheduler.Schedule(inactivity, () =>
{
lock (gate)
{
dump();
}
});
}
}
};
subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);
return () =>
{
lock (gate)
{
o = null;
dispose();
}
};
});
}
Reactive Extension buffer from buffers
Yes, it's easy. Just try this:
var source = Observable.Range(0, 100).Buffer(5)
var target = source.SelectMany(x => x).Buffer(7);
My source
has a buffer length of 5
that gets converted to a buffer length of 7
in target
.
How do I obtain a rolling buffer of the last two items emitted from a reactive stream?
RXJS 4
You maybe don't even need a buffer
for this, a simple concatMap
might work for you (of course I don't know any details of your stream:
observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);
observable
.bufferWithCount(2, 1)
.subscribe(all => {
console.log(all);
});
See live here
Buffer until quiet behavior from Reactive?
Some similar questions exist on SO but not exactly like this.
Here's an extension method that does the trick.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Reactive Throttle returning all items added within the TimeSpan
As I answered in the other post, yes you can! Using the Throttle
and Window
methods of Observable
:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Is there a Reactive Framework Subject that buffers until it doesn't need to?
The answer appears to be "No". You could write one on your own with relatively little trouble, though it would look a lot like a ReplaySubject
, just one which drops its queue entirely upon subscription. Constructing one from pieces is fraught with race conditions.
The problem arose for me when I wrote an API that exposed IObservable<TConnection>
where TConnection
had a member IObservable<TMessage>
. People who observed the new TConnection
needed to be sure that they didn't miss any TMessage
s on that connection. The underlying event source made that slightly awkward. I require that, if an observer does not want to miss any messages, that he subscribe to the IObservable<TMessage>
before the call to OnNext(TConnection)
completes. This is almost always what someone will want to do, they just need to be sure they don't do something silly like .ObserveOn(Scheduler.TaskPool)
for their IObservable<TConnection>
. If programmers agree to do this, then indeed no buffering is required at all (in this particular implementation) and a regular Subject
will do the trick.
Reactive Extensions Buffer on count, interval and event
This is the working solution I have. Extra console.log()'s are added to show the sequence of events.
The only thing that's a bit bothersome is the .skip(1)
in fullBufferTrigger, but it's needed as it will trigger when it's buffer is full (natch), but the buffer in bufferedEvent$
does not seem to have the latest event before it's triggered.
Luckily, with the timeoutTrigger
in place, the last event gets emitted. Without timeout, fullBufferTrigger by itself will not emit the final event.
Also, changed buffer
to bufferWhen
, as the former did not seem to trigger with two triggers, although you'd expect it to from the documentation.
footnote with buffer(race())
the race only completes once, so whichever trigger got there first will thereafter be used and the other triggers dis-regarded. In contrast, bufferWhen(x => race())
evaluates every time an event occurs.
const bufferPeriodMs = 1000
const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))
// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)
const bufferedEvent$ = event$
.bufferWhen( x =>
Observable.race(
fullBufferTrigger,
timeoutTrigger
)
)
.filter(events => events.length > 0)
// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
console.log('subscription', events)
})
// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => {
event$.next('event4')
}, delayBy(1))
setTimeout( () => {
event$.next('event5')
}, delayBy(2))
setTimeout( () => {
event$.next('event6')
event$.next('event7')
}, delayBy(3))
Working example: CodePen
Edit: Alternative way to trigger the buffer
Since the combination of bufferWhen
and race
might be a bit inefficient (the race is restarted each event emission), an alternative is to merge the triggers into one stream and use a simple buffer
const bufferTrigger$ = timeoutTrigger
.merge(fullBufferTrigger)
.merge(beforeunloadTrigger)
const bufferedEvent$ = event$
.buffer(bufferTrigger$)
.filter(events => events.length > 0)
reactive extensions sliding time window
Would this extension method solve your problem?
public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering)
{
return Observable.Create<T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp().Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, ex => o.OnError(ex), () => o.OnCompleted());
});
}
Combine streams with maximum time interval between values
You can just use Rx combinators. Since, your primary aim is Zip
, let's start with Zip
, and then apply your expiry conditions.
public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TRight, TOut> selector,
TimeSpan validity)
{
return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
.Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
.Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
}
If you want to check the adjacent values in a stream, you can rewrite it using TimeInterval
operator instead of Timestamp
.
Related Topics
Can Razor Class Library Pack Static Files (Js, CSS etc) Too
Entity Framework - Stored Procedure Return Value
How to Print PDF on Default Network Printer Using Ghostscript (Gswin32C.Exe) Shell Command
How to Drag a Usercontrol Inside a Canvas
Read from a File Starting at the End, Similar to Tail
Write PDF Stream to Response Stream
Gracefully Handling Corrupted State Exceptions
How to Concatenate Two System.Io.Stream Instances into One
Validate JSON Against JSON Schema C#
Expose and Raise Event of a Child Control in a Usercontrol in C#