A Way to Push Buffered Events in Even Intervals

A way to push buffered events in even intervals

It's actually tricker than it sounds.

Using Delay doesn't work because the values will still happen in bulk, only slightly delayed.

Using Interval with either CombineLatest or Zip doesn't work, since the former will cause source values to be skipped and the latter will buffer interval values.

I think the new Drain operator (added in 1.0.2787.0), combined with Delay should do the trick:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

The Drain operator works like SelectMany, but waits until the previous output completes before calling the selector with the next value. It's still not exactly what you are after (the first value in a block will also be delayed), but it's close: The usage above matches your marble diagram now.

Edit: Apparently the Drain in the framework doesn't work like SelectMany. I'll ask for some advice in the official forums. In the meantime, here's an implementation of Drain that does what you're after:

Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.

public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}

Rx IObservable buffering to smooth out bursts of events

This is actually a duplicate of A way to push buffered events in even intervals, but I'll include a summary here (the original looks quite confusing because it looks at a few alternatives).

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Drain(x =>
Observable.Empty<int>()
.Delay(minDelay)
.StartWith(x)
);
}

My implementation of Drain works like SelectMany, except it waits for the previous output to finish first (you could think of it as ConactMany, whereas SelectMany is more like MergeMany). The built-in Drain does not work this way, so you'll need to include the implementation below:

public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(
this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}

How to buffer based on time and count, but stopping the timer if no events occur

The Where filter you propose is a sound approach, I'd go with that.

You could wrap the Buffer and Where into a single helper method named to make the intent clearer perhaps, but rest assured the Where clause is idiomatic Rx in this scenario.

Think of it this way; an empty Buffer is relaying information that no events occurred in the last second. While you can argue that this is implicit, it would require extra work to detect this if Buffer didn't emit an empty list. It just so happens it's not information you are interested in - so Where is an appropriate way to filter this information out.

A lazy timer solution

Following from your comment ("...the timer... be[ing] lazily initiated...") you can do this to create a lazy timer and omit the zero counts:

var source = Observable.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge();

var xs = source.Publish(pub =>
pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
.Merge(pub.Skip(19)).Take(1)));

xs.Subscribe(x => Console.WriteLine(x.Count));

Explanation

Publishing

This query requires subscribing to the source events multiple times. To avoid unexpected side-effects, we use Publish to give us pub which is a stream that multicasts the source creating just a single subscription to it. This replaces the older Publish().RefCount() technique that achieved the same end, effectively giving us a "hot" version of the source stream.

In this case, this is necessary to ensure the subsequent buffer closing streams produced after the first will start with the current events - if the source was cold they would start over each time. I wrote a bit about publishing here.

The main query

We use an overload of Buffer that accepts a factory function that is called for every buffer emitted to obtain an observable stream whose first event is a signal to terminate the current buffer.

In this case, we want to terminate the buffer when either the first event into the buffer has been there for a full second, or when 20 events have appeared from the source - whichever comes first.

To achieve this we Merge streams that describe each case - the Take(1).Delay(...) combo describes the first condition, and the Skip(19).Take(1) describes the second.

However, I would still test performance the easy way, because I still suspect this is overkill, but a lot depends on the precise details of the platform and scenario etc.

RX buffer events for several seconds after first event is triggered

I'm assuming you want the following behavior:

  1. After an initial event, buffer all events for the next 10 seconds.
  2. Once that 10 second window closes, the next should trigger a new 10 second buffer for all events 10 seconds after that.

So let's say we have 5 events evenly spread out in 5 seconds, a 13 second gap, then another 5 events evenly spread out in 5 seconds. Marble diagram would look like this:

timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return

The problem with using straight-forward Buffer is that it would look like the stdbuff notated above, and break up the second group of events into two groups, resulting in two lists for the second group of events: one with three events, one with two events. You want one list (for that second group), using a logic like the desired stream. Start capturing at 0, return list at 10. Start capturing at 17, return list at 27.

If I'm misunderstanding you (again), the please post a marble diagram, similar to the above, representing how you want things to work.


Assuming I understand you correctly, the below code will work...

//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));

//Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));

initialSource
.Publish( _source => _source
.Buffer(_source
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
)
)
.Subscribe(list =>
{
Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
Debug.WriteLine($"List Count: {list.Count}");
});

Explanation:

First we need to identify 'primary events', those that represent the BeginCapture annotations in the desired stream depiction above. That can be found like this:

 var primaryEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged();

Once we have the BeginCapture events, which can represent a window opening, it's pretty easy to find the Return events, or the window closing:

 var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));

In practice, since nothing happens between a close and an open that we care about, we really only need to worry about the close events, so we can shrink it to this:

 var closeEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10));

Plug that into Buffer with the closeEvents being the bufferBoundaries:

var bufferredLists = initialSource
.Buffer(initialsource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
);

Finally, since we have multiple subscriptions to initialSource, we'll want to use Publish to ensure concurrency works properly, leading to the final answer.

How to merge a nested observable IObservableIObservableT with limited concurrency and limited buffer capacity?

I came up with a functional solution, I'm not sure it's the way to go, just because of complexity. But I think I covered all the bases.

First, if you take a functional approach, this is a relatively simple state-machine problem: The state needs to know how many observables are currently executing and the buffer queue. The two events that can affect the state are a new Observable entering the buffer queue (causes an enqueue on the buffer queue), or a currently-executing observable terminating (causes a dequeue on the buffer queue).

Since state-machine basically means Scan, and Scan can only work with one type, we'll have to coerce our two events into one type, which I called Message below. The state machine then knows all and can do the work of the Merge(n) overload.

The last trick is the loop-back: Since the completing Observable is 'downstream' from Scan, we need to 'loop-back' the termination of that observable into Scan. For that, I always refer back to the Drain function in [this answer][1].

public static class X
{
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return Observable.Defer(() =>
{
var capacityQueue = new Subject<Unit>();

var toReturn = source.Publish(_source => _source
.Select(o => Message.Enqueue(o))
.Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
.Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
{
var buffer = state.buffer;
var bufferCount = state.bufferCount;
var executionCount = state.executionCount;
if (message.IsEnqueue)
{
if (executionCount < maximumConcurrency)
return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);

buffer = buffer.Enqueue(message.Object);
if (bufferCount == boundedCapacity)
buffer = buffer.Dequeue();
else
bufferCount++;
return (bufferCount, buffer, executionCount, null);
}
else
{
if (bufferCount == 0)
return (0, buffer, executionCount - 1, null);
else
return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
}
})
.Where(t => t.item != null)
.Select(t => t.item)
.Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
.TakeUntil(_source.IgnoreElements().Materialize())
.Merge()
);

return toReturn;
});

}

public class Message
{
public static Message<T> Enqueue<T>(T t)
{
return Message<T>.Enqueue(t);
}

public static Message<T> Dequeue<T>(T t)
{
return Message<T>.Dequeue(t);
}

}

public class Message<T>
{
private readonly T _t;
private readonly bool _isEnqueue;
private Message(bool isEnqueue, T t)
{
_t = t;
_isEnqueue = isEnqueue;
}

public static Message<T> Enqueue(T t)
{
return new Message<T>(true, t);
}

public static Message<T> Dequeue(T t)
{
return new Message<T>(false, t);
}

public bool IsEnqueue => _isEnqueue;
public T Object => _t;
}
}

I wrote some test-code (based on original question) to verify, if you want to piggy back off of that. Test now passing:

//              T: 0123456789012345678901234567890123
// T10: 0 1 2 3
// Source: +----A------B------C------|
// A: +-------a----a---|
// B: +----------b----b---|
// C: +--------c----|
// ExpectedResult: +------------a----a---------c----|

var ts = new TestScheduler();

var A = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual"); // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

(Test code passes without exception)

Process Rx events at fixed or minimum intervals

I answered this very question on my blog here.

Reproducing (in case of link rot!) with the addition of presenting as an extension method:

Constraining a stream of events in Rx to a maximum rate

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

So how do we translate this metaphor into Rx?

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

Lets assume the input events are an IObservable in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat();

As an extension method this becomes:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat();

}

How to subscribe to, but buffer data from, an IObservable until another IObservable has published?

This works for me:

var r = Observable.Create<int>(o =>
{
var rs = new ReplaySubject<int>();
var subscription1 = s1.Subscribe(rs);
var query = from f in s2.Take(1) select rs.AsObservable();
var subscription2 = query.Switch().Subscribe(o);
return new CompositeDisposable(subscription1, subscription2);
});

Reactive Extensions Buffer on count, interval and event

This is the working solution I have. Extra console.log()'s are added to show the sequence of events.

The only thing that's a bit bothersome is the .skip(1) in fullBufferTrigger, but it's needed as it will trigger when it's buffer is full (natch), but the buffer in bufferedEvent$ does not seem to have the latest event before it's triggered.

Luckily, with the timeoutTrigger in place, the last event gets emitted. Without timeout, fullBufferTrigger by itself will not emit the final event.

Also, changed buffer to bufferWhen, as the former did not seem to trigger with two triggers, although you'd expect it to from the documentation.
footnote with buffer(race()) the race only completes once, so whichever trigger got there first will thereafter be used and the other triggers dis-regarded. In contrast, bufferWhen(x => race()) evaluates every time an event occurs.

const bufferPeriodMs = 1000

const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))

// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)

const bufferedEvent$ = event$
.bufferWhen( x =>
Observable.race(
fullBufferTrigger,
timeoutTrigger
)
)
.filter(events => events.length > 0)

// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
console.log('subscription', events)
})

// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500
event$.next('event1')
event$.next('event2')
event$.next('event3')

setTimeout( () => {
event$.next('event4')
}, delayBy(1))

setTimeout( () => {
event$.next('event5')
}, delayBy(2))

setTimeout( () => {
event$.next('event6')
event$.next('event7')
}, delayBy(3))

Working example: CodePen

Edit: Alternative way to trigger the buffer

Since the combination of bufferWhen and race might be a bit inefficient (the race is restarted each event emission), an alternative is to merge the triggers into one stream and use a simple buffer

const bufferTrigger$ = timeoutTrigger
.merge(fullBufferTrigger)
.merge(beforeunloadTrigger)

const bufferedEvent$ = event$
.buffer(bufferTrigger$)
.filter(events => events.length > 0)

Smoothing Rx Observables

How about this? (inspired by James' answer mentioned in the comments)...

public static IObservable<T> Regulate<T>(this IObservable<T> source, TimeSpan period)
{
var interval = Observable.Interval(period).Publish().RefCount();

return source.Select(x => Observable.Return(x)
.CombineLatest(interval, (v, _) => v)
.Take(1))
.Concat();
}

It turns each value in the raw observable into its own observable. The CombineLatest means it won't produce a value until the interval does. Then we just take one value from each of these observables and concatenate.

The first value in the raw observable gets delayed by one period. I'm not sure if that is an issue for you or not.

What is the best way to rate limit consuming of an Observable?

The question is not 100% clear so I'm making some presumptions.

Observable.Delay is not what you want because that will create a delay from when each event arrives, rather than creating even time intervals for processing.

Observable.Buffer is not what you want because that will cause all events in each given interval to be passed to you, rather than one at a time.

So I believe you're looking for a solution that creates some sort of metronome that ticks away, and gives you an event per tick. This can be naively constructed using Observable.Interval for the metronome and Zip for connecting it to your source:

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));
var triggeredSource = source.Zip(trigger, (s,_) => s);
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

This will trigger every 5 seconds (in the example above), and give you the original items in sequence.

The only problem with this solution is that if you don't have any more source elements for (say) 10 seconds, when the source elements arrive they will be immediately sent out since some of the 'trigger' events are sitting there waiting for them. Marble diagram for that scenario:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result: ----a----b----c-------------d-e-f-g

This is a very reasonable issue. There are two questions here already that tackle it:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

The solution provided is a main Drain extension method and secondary Buffered extension. I've modified these to be far simpler (no need for Drain, just use Concat). Usage is:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

The extension method StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}


Related Topics



Leave a reply



Submit