Does Reactive Extensions Support Rolling Buffers

Does reactive extensions support rolling buffers?

I wrote an extension to do most of what you're after - BufferWithInactivity.

Here it is:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
{
return Observable.Create<IEnumerable<T>>(o =>
{
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;

Action dump = () =>
{
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
{
o.OnNext(bts);
}
};

Action dispose = () =>
{
if (subscription != null)
{
subscription.Dispose();
}
mutable.Dispose();
};

Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
{
lock (gate)
{
dispose();
dump();
if (o != null)
{
onAction(o);
}
}
};

Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));

Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

Action<T> onNext = t =>
{
lock (gate)
{
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
{
dump();
mutable.Disposable = Disposable.Empty;
}
else
{
mutable.Disposable = scheduler.Schedule(inactivity, () =>
{
lock (gate)
{
dump();
}
});
}
}
};

subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);

return () =>
{
lock (gate)
{
o = null;
dispose();
}
};
});
}

Reactive Extension buffer from buffers

Yes, it's easy. Just try this:

var source = Observable.Range(0, 100).Buffer(5)
var target = source.SelectMany(x => x).Buffer(7);

My source has a buffer length of 5 that gets converted to a buffer length of 7 in target.

How do I obtain a rolling buffer of the last two items emitted from a reactive stream?

RXJS 4

You maybe don't even need a buffer for this, a simple concatMap might work for you (of course I don't know any details of your stream:

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);

observable
.bufferWithCount(2, 1)
.subscribe(all => {
console.log(all);
});

See live here

Buffer until quiet behavior from Reactive?

Some similar questions exist on SO but not exactly like this.
Here's an extension method that does the trick.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}

Reactive Throttle returning all items added within the TimeSpan

As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}

Is there a Reactive Framework Subject that buffers until it doesn't need to?

The answer appears to be "No". You could write one on your own with relatively little trouble, though it would look a lot like a ReplaySubject, just one which drops its queue entirely upon subscription. Constructing one from pieces is fraught with race conditions.

The problem arose for me when I wrote an API that exposed IObservable<TConnection> where TConnection had a member IObservable<TMessage>. People who observed the new TConnection needed to be sure that they didn't miss any TMessages on that connection. The underlying event source made that slightly awkward. I require that, if an observer does not want to miss any messages, that he subscribe to the IObservable<TMessage> before the call to OnNext(TConnection) completes. This is almost always what someone will want to do, they just need to be sure they don't do something silly like .ObserveOn(Scheduler.TaskPool) for their IObservable<TConnection>. If programmers agree to do this, then indeed no buffering is required at all (in this particular implementation) and a regular Subject will do the trick.

Reactive Extensions Buffer on count, interval and event

This is the working solution I have. Extra console.log()'s are added to show the sequence of events.

The only thing that's a bit bothersome is the .skip(1) in fullBufferTrigger, but it's needed as it will trigger when it's buffer is full (natch), but the buffer in bufferedEvent$ does not seem to have the latest event before it's triggered.

Luckily, with the timeoutTrigger in place, the last event gets emitted. Without timeout, fullBufferTrigger by itself will not emit the final event.

Also, changed buffer to bufferWhen, as the former did not seem to trigger with two triggers, although you'd expect it to from the documentation.
footnote with buffer(race()) the race only completes once, so whichever trigger got there first will thereafter be used and the other triggers dis-regarded. In contrast, bufferWhen(x => race()) evaluates every time an event occurs.

const bufferPeriodMs = 1000

const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))

// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)

const bufferedEvent$ = event$
.bufferWhen( x =>
Observable.race(
fullBufferTrigger,
timeoutTrigger
)
)
.filter(events => events.length > 0)

// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
console.log('subscription', events)
})

// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500
event$.next('event1')
event$.next('event2')
event$.next('event3')

setTimeout( () => {
event$.next('event4')
}, delayBy(1))

setTimeout( () => {
event$.next('event5')
}, delayBy(2))

setTimeout( () => {
event$.next('event6')
event$.next('event7')
}, delayBy(3))

Working example: CodePen

Edit: Alternative way to trigger the buffer

Since the combination of bufferWhen and race might be a bit inefficient (the race is restarted each event emission), an alternative is to merge the triggers into one stream and use a simple buffer

const bufferTrigger$ = timeoutTrigger
.merge(fullBufferTrigger)
.merge(beforeunloadTrigger)

const bufferedEvent$ = event$
.buffer(bufferTrigger$)
.filter(events => events.length > 0)

reactive extensions sliding time window

Would this extension method solve your problem?

public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering)
{
return Observable.Create<T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp().Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, ex => o.OnError(ex), () => o.OnCompleted());
});
}

Combine streams with maximum time interval between values

You can just use Rx combinators. Since, your primary aim is Zip, let's start with Zip, and then apply your expiry conditions.

public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TRight, TOut> selector,
TimeSpan validity)
{
return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
.Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
.Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
}

If you want to check the adjacent values in a stream, you can rewrite it using TimeInterval operator instead of Timestamp.



Related Topics



Leave a reply



Submit