How to Throttle Event Stream Using Rx

How to throttle event stream using RX?

Here's is what I got with some help from the RX Forum:

The idea is to issue a series of "tickets" for the original sequence to fire. These "tickets" are delayed for the timeout, excluding the very first one, which is immediately pre-pended to the ticket sequence. When an event comes in and there is a ticket waiting, the event fires immediately, otherwise it waits till the ticket and then fires. When it fires, the next ticket is issued, and so on...

To combine the tickets and original events, we need a combinator. Unfortunately, the "standard" .CombineLatest cannot be used here because it would fire on tickets and events that were used previousely. So I had to create my own combinator, which is basically a filtered .CombineLatest, that fires only when both elements in the combination are "fresh" - were never returned before. I call it .CombineVeryLatest aka .BrokenZip ;)

Using .CombineVeryLatest, the above idea can be implemented as such:

    public static IObservable<T> SampleResponsive<T>(
this IObservable<T> source, TimeSpan delay)
{
return source.Publish(src =>
{
var fire = new Subject<T>();

var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());

var subscription = src
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);

return fire.Finally(subscription.Dispose);
});
}

public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
var ls = leftSource.Select(x => new Used<TLeft>(x));
var rs = rightSource.Select(x => new Used<TRight>(x));
var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
var fltCmb = cmb
.Where(a => !(a.x.IsUsed || a.y.IsUsed))
.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
}

private class Used<T>
{
internal T Value { get; private set; }
internal bool IsUsed { get; set; }

internal Used(T value)
{
Value = value;
}
}

Edit: here's another more compact variation of CombineVeryLatest proposed by Andreas Köpf on the forum:

public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
return Observable.Defer(() =>
{
int l = -1, r = -1;
return Observable.CombineLatest(
leftSource.Select(Tuple.Create<TLeft, int>),
rightSource.Select(Tuple.Create<TRight, int>),
(x, y) => new { x, y })
.Where(t => t.x.Item2 != l && t.y.Item2 != r)
.Do(t => { l = t.x.Item2; r = t.y.Item2; })
.Select(t => selector(t.x.Item1, t.y.Item1));
});
}

Sliding' RX.net .Throttle() Window

As @nikoniko already mentioned, throttle will do the trick.

using System;
using System.Reactive.Linq;

namespace Printing {
class Program {
static void Main(string[] args) {
var source = Observable.Interval(TimeSpan.FromMilliseconds(333))
.Do(i => Console.WriteLine($"new item: {i}"));
var sampling = source.Throttle(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"sampled: {i}"));

var subscription = sampling.Subscribe();

Console.ReadLine();

subscription.Dispose();

Console.ReadLine();
}
}

}
Resulting in nothing because the events from source arrive in two high frequency. But if source need more time to deliver an element then the timespan given in throttle:

using System;
using System.Reactive.Linq;

namespace Printing {
class Program {
static void Main(string[] args) {
var source = Observable.Interval(TimeSpan.FromSeconds(1.2))
.Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: new item: {i}"));
var sampling = source.Throttle(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: {i}"));

var subscription = sampling.Subscribe();

Console.ReadLine();

subscription.Dispose();

Console.ReadLine();
}
}
}

The result will appear after throttling time is over. As you can see, on second after a event in source is fired, it will appear in the result.

08:32:26: new item: 0
08:32:27: throttle 0
08:32:28: new item: 1
08:32:29: throttle 1
08:32:30: new item: 2
08:32:31: throttle 2
08:32:32: new item: 3
08:32:33: throttle 3
08:32:34: new item: 4
08:32:35: throttle 4
08:32:36: new item: 5
08:32:37: throttle 5

How can I use Reactive Extensions to throttle Events using a max window size?

As James stated, Observable.Sample will give you the latest value yielded. However, it will do so on a timer, and not in accordance to when the first event in the throttle occurred. More importantly, however, is that if your sample time is high (say ten seconds), and your event fires right after a sample is taken, you won't get that new event for almost ten seconds.

If you need something a little tighter, you'll need to implement your own function. I've taken the liberty of doing so. This code could definitely use some clean up, but I believe it does what you've asked for.

public static class ObservableEx
{
public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
{
return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
}

public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var hasValue = false;
T value = default(T);

var maxTimeDisposable = new SerialDisposable();
var dueTimeDisposable = new SerialDisposable();

Action action = () =>
{
if (hasValue)
{
maxTimeDisposable.Disposable = Disposable.Empty;
dueTimeDisposable.Disposable = Disposable.Empty;
o.OnNext(value);
hasValue = false;
}
};

return source.Subscribe(
x =>
{
if (!hasValue)
{
maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
}

hasValue = true;
value = x;
dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
},
o.OnError,
o.OnCompleted
);
});
}
}

And a few tests...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
[TestMethod]
public void CanThrottle()
{

var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(200, 1)
);
}

[TestMethod]
public void CanThrottleWithMaximumInterval()
{

var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(175, 2),
OnNext(250, 3),
OnNext(325, 4),
OnNext(400, 5)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(350, 4),
OnNext(500, 5)
);
}

[TestMethod]
public void CanThrottleWithoutMaximumIntervalInterferance()
{
var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(325, 2)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(425, 2)
);
}
}

Rx: How can I respond immediately, and throttle subsequent requests

The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

Consider instead this timing:

factory.StartNewDelayed(300,() =>
{
Console.WriteLine("Batch 1 (300ms delay)");
subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
Console.WriteLine("Batch 2 (700ms delay)");
subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});

What I get is:

Batch 1 (300ms delay)

Handling 1 at 356ms

Batch 2 (700ms delay)

Handling 2 at 750ms

Batch 3 (1.3s delay)

Handling 3 at 1346ms

Batch 4 (1.6s delay)

Handling 4 at 1644ms

This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

What I want is:

Batch 1 (300ms delay)

Handling 1 at ~300ms

Batch 2 (700ms delay)

Batch 3 (1.3s delay)

Handling 3 at ~1300ms

Batch 4 (1.6s delay)

After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

bool isCoolingDown = false;

subject
.Where(_ => !isCoolingDown)
.Subscribe(
i =>
{
DoStuff(i);

isCoolingDown = true;

Observable
.Interval(cooldownInterval)
.Take(1)
.Subscribe(_ => isCoolingDown = false);
});

Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.

How to use Reactive Extensions to cache, throttle, and relay multiple events?

Ah, I love playing with Rx...here's one approach; first by part, then the whole thing:

EDIT: modified to suit comments

First, you'll need to set up the streams for the events - While we're at it, let's replace the "old" .NET event pattern the bridge exposes with the "new hawtness" of Rx:

public Subject<Thing> BufferedAdds {get; private set;}
public Subject<Thing> BufferedChanges {get; private set;}
public Subject<Thing> BufferedRemoves {get; private set;}

_adds = Observable.FromEvent<EventDelegateAdd, Thing>(
ev => new EventDelegateAdd(ev),
h => mSource.AddingThing += h,
h => mSource.AddingThing -= h);
BufferedAdds = new Subject<Thing>();

_changes = Observable.FromEvent<EventDelegateChange, Thing>(
ev => new EventDelegateChange(ev),
h => mSource.ChangingThing += h,
h => mSource.ChangingThing -= h);
BufferedChanges = new Subject<Thing>();

_removes = Observable.FromEvent<EventDelegateRemove, Thing>(
ev => new EventDelegateRemove(ev),
h => mSource.RemovingThing += h,
h => mSource.RemovingThing -= h);
BufferedRemoves = new Subject<Thing>();

We're also going to tie everything to an IScheduler now - IScheduler usage is a key facet for testing this bloody stuff without Thread.Sleep'ing all over the place - I highly recommend researching this topic!

public MyEventCachingBridge(
MyCustomDataSource source,
int eventRelayInterval,
IScheduler scheduler)
{

Then you'll want to combine all the incoming events into a single stream, then "chunk" that stream based on a time window - the Buffer operator is ideal for this:

_buffer = Observable.Merge(scheduler,
_adds.Select(e => Tuple.Create(e, ThingEventType.Add)),
_changes.Select(e => Tuple.Create(e, ThingEventType.Change)),
_removes.Select(e => Tuple.Create(e, ThingEventType.Remove)))
.Buffer(TimeSpan.FromMilliseconds(eventRelayInterval), scheduler);

Notice I'm packing the type of the event back into the stream - this is so we can take the appropriate action during the playback - the enum is:

private enum ThingEventType
{
Add,
Change,
Remove
}

So now we'll need something to listen to and hold the batched events - there are a number of options here, but let's use a simple List with synchronization:

private Queue<IList<Tuple<Thing,ThingEventType>>> _eventQueue;
private static object SyncRoot = new object();

_eventQueue = new Queue<IList<Tuple<Thing,ThingEventType>>>();

// A serial disposable is a sort of "Disposable holder" - when you change it's
// Disposable member, it auto-disposes what you originally had there...no real
// need for it here, but potentially useful later
_watcherDisposable = new SerialDisposable();
_watcherDisposable.Disposable = _buffer
.ObserveOn(_scheduler)
.Subscribe(batch =>
{
lock(SyncRoot) { _eventQueue.Enqueue(batch); }
});
_disposables.Add(_watcherDisposable);

Let's also "auto-wire" the Playback burst to pulse every eventRelayInterval ms:

var pulse = Observable.Interval(
TimeSpan.FromMilliseconds(eventRelayInterval),
_scheduler);
_disposables.Add(pulse
.ObserveOn(_scheduler)
.Subscribe(x => PlayBackCachedEvents()));

Subscriptions are always IDisposable, and you will want to dispose them, so let's add in some stuff for that:

public class MyEventCachingBridge : IDisposable
{
CompositeDisposable _disposables;

public void Dispose()
{
_disposables.Dispose();
}

And now for the playback:

public void PlayBackCachedEvents()
{
BulkChangesStart(); // Raise Event to notify GUI to suspend screen updates

// Play back the list of events to push changes
lock(SyncRoot)
{
foreach(var batch in _eventQueue)
{
// Play back the list of events to push changes to ListView, TreeView, graphs, etc...
foreach(var evt in batch)
{
switch(evt.Item2)
{
case ThingEventType.Add: BufferedAdds.OnNext(evt.Item1); break;
case ThingEventType.Change: BufferedChanges.OnNext(evt.Item1);break;
case ThingEventType.Remove: BufferedRemoves.OnNext(evt.Item1);break;
}
}
}
_eventQueue.Clear();
}
BulkChangesEnd(); // Raise Event to notify GUI to allow control refresh
}

NOW - we want to be all fancy on the consumer side as well, so let's mock up a UI window (this is WPF, adjust accordingly):

public class BridgeConsumer : Window, IDisposable
{
private readonly CompositeDisposable _disposables;
private IScheduler _scheduler;
private StackPanel _panel;

public void OnLoaded(object sender, RoutedEventArgs ea)
{
_panel = new StackPanel();
this.Content = _panel;
}

public BridgeConsumer(MyEventCachingBridge bridge, IScheduler scheduler)
{
// for cleanup of any subscriptions
_disposables = new CompositeDisposable();
_disposables.Add(bridge);
_scheduler = scheduler;

Loaded += OnLoaded;

// setup a listener for the bulk start/end events on the bridge
var bulkStart = Observable.FromEvent(
h => bridge.BulkChangesStart += new EventDelegateBulkChangesStart(h),
h => bridge.BulkChangesStart -= new EventDelegateBulkChangesStart(h));
var bulkEnd = Observable.FromEvent(
h => bridge.BulkChangesEnd += new EventDelegateBulkChangesEnd(h),
h => bridge.BulkChangesEnd -= new EventDelegateBulkChangesEnd(h));

// the "meaty bit" -
// 1. create a "window" defined by bulk start/end events
// 2. inside that "window", trap any occurrences on a
// merged view of adds/changes/removes
// 3. foreach event in that window, select that event
// (i.e., give us window contents as a stream of sorts)
var bridgeWatcher =
from thingEventWindow in
Observable.Merge(
bridge.BufferedAdds.Select(t => Tuple.Create("add", t)),
bridge.BufferedChanges.Select(t => Tuple.Create("change", t)),
bridge.BufferedRemoves.Select(t => Tuple.Create("remove", t))
)
.Window(bulkStart, start => bulkEnd)
from thingEvent in thingEventWindow
select thingEvent;

// this could just as easily be a method, a bound call to the viewmodel, etc
Action<Thing, string, DateTimeOffset> addToList = (thing, msg, ts) =>
{
var text = new TextBlock()
{
Text = string.Format(
"At:{0} Key:{2} Msg:{3} - nowTime = {1}",
thing.TimeStamp,
ts,
thing.Key,
msg) };
_panel.Children.Add(text);
};

_disposables.Add(bridgeWatcher
// CAREFUL! "ObserveOn" means what you'd think "SubscribeOn" would
.ObserveOnDispatcher()
.Subscribe(tup =>
{
addToList(tup.Item2, tup.Item1, _scheduler.Now);
}));
}

public void Dispose()
{
// clean up
if(_disposables != null) _disposables.Dispose();
}
}

The whole shebang:

void Main()
{
var scheduler = Scheduler.Default;
var rnd = new Random();
var canceller = new CancellationTokenSource();

var source = new MyCustomDataSource();
var eventRelayInterval = 2000;
var bridge = new MyEventCachingBridge(source, eventRelayInterval, scheduler);

var window = new BridgeConsumer(bridge);
window.Closed += (o,e) => { canceller.Cancel(); window.Dispose(); };
window.Show();

Task.Factory.StartNew(
() =>
{
while(true)
{
var thing = new Thing()
{
Key = "added thing " + rnd.Next(0, 100),
Title = "title for added thing",
TimeStamp = scheduler.Now.DateTime
};
source.FireAdd(thing);
Thread.Sleep(rnd.Next(1,10) * 100);
}
}, canceller.Token);
}

public class BridgeConsumer : Window, IDisposable
{
private readonly CompositeDisposable _disposables;
private StackPanel _panel;

public void OnLoaded(object sender, RoutedEventArgs ea)
{
_panel = new StackPanel();
this.Content = _panel;
}

public BridgeConsumer(MyEventCachingBridge bridge)
{
_disposables = new CompositeDisposable();
_disposables.Add(bridge);

Loaded += OnLoaded;

var bulkStart = Observable.FromEvent(
h => bridge.BulkChangesStart += new EventDelegateBulkChangesStart(h),
h => bridge.BulkChangesStart -= new EventDelegateBulkChangesStart(h));
var bulkEnd = Observable.FromEvent(
h => bridge.BulkChangesEnd += new EventDelegateBulkChangesEnd(h),
h => bridge.BulkChangesEnd -= new EventDelegateBulkChangesEnd(h));
var bridgeWatcher =
from thingEventWindow in
Observable.Merge(
bridge.BufferedAdds.Select(t => Tuple.Create("add", t)),
bridge.BufferedChanges.Select(t => Tuple.Create("change", t)),
bridge.BufferedRemoves.Select(t => Tuple.Create("remove", t))
)
.Window(bulkStart, start => bulkEnd)
from thingEvent in thingEventWindow
select thingEvent;

Action<Thing, string> addToList = (thing, msg) =>
{
var text = new TextBlock()
{
Text = string.Format(
"At:{0} Key:{1} Msg:{2}",
thing.TimeStamp,
thing.Key,
msg)
};
_panel.Children.Add(text);
};

_disposables.Add(bridgeWatcher.ObserveOnDispatcher().Subscribe(tup =>
{
addToList(tup.Item2, tup.Item1);
}));
}

public void Dispose()
{
if(_disposables != null) _disposables.Dispose();
}
}

public delegate void EventDelegateAdd(Thing thing);
public delegate void EventDelegateChange(Thing thing);
public delegate void EventDelegateRemove(Thing thing);

public delegate void EventDelegateBulkChangesStart();
public delegate void EventDelegateBulkChangesEnd();

// The "Things" that are stored in MyCustomDataSource

public class Thing
{
public DateTime TimeStamp {get; set;}
public string Key { get; set; }
public string Title { get; set; }
public object OtherStuff { get; set; }
public override string ToString()
{
return string.Format("At:{0} Key:{1} Title:{2}", this.TimeStamp, this.Key, this.Title);
}
}

// A custom observable data source with events that indicate when Things are
// added, changed, or removed.

public class MyCustomDataSource
{
public event EventDelegateAdd AddingThing = delegate { };
public event EventDelegateChange ChangingThing = delegate { };
public event EventDelegateRemove RemovingThing = delegate { };

// The rest of the class that manages the database of Things...
public void FireAdd(Thing toAdd)
{
AddingThing(toAdd);
}
public void FireChange(Thing toChange)
{
ChangingThing(toChange);
}
public void FireRemove(Thing toRemove)
{
RemovingThing(toRemove);
}
}

// This class forms a configurable event bridge between the MyCustomDataSource and
// the GUI. The goal is to cache, filter, and throttle the events so that the GUI
// updates only occasionally with bulk changes that are relevant for that control.

public class MyEventCachingBridge : IDisposable
{
private enum ThingEventType
{
Add,
Change,
Remove
}

private MyCustomDataSource mSource;
private IScheduler _scheduler;

public event EventDelegateBulkChangesStart BulkChangesStart = delegate { };
public event EventDelegateBulkChangesEnd BulkChangesEnd = delegate { };

public IObservable<Thing> RawAdds {get; private set;}
public IObservable<Thing> RawChanges {get; private set;}
public IObservable<Thing> RawRemoves {get; private set;}

public Subject<Thing> BufferedAdds {get; private set;}
public Subject<Thing> BufferedChanges {get; private set;}
public Subject<Thing> BufferedRemoves {get; private set;}

private readonly IObservable<IList<Tuple<Thing,ThingEventType>>> _buffer;
private List<IList<Tuple<Thing,ThingEventType>>> _eventQueue;
private static object SyncRoot = new object();

private readonly CompositeDisposable _disposables;
private readonly SerialDisposable _watcherDisposable;

public MyEventCachingBridge(MyCustomDataSource source, int eventRelayInterval, IScheduler scheduler)
{
_disposables = new CompositeDisposable();
mSource = source;
_scheduler = scheduler;
_eventQueue = new List<IList<Tuple<Thing,ThingEventType>>>();

// Magical Reactive Extensions code goes here that subscribes to all 3 events...
//
// mSource.AddingThing
// mSource.ChangingThing
// mSource.RemovingThing
//
// ...filters and records a list of the events as they are received ( maintaining order of events too ),
// then every eventRelayInterval milliseconds, plays back the events in bulk to update the GUI
// ( on the GUIs thread ). Note that LINQ will be used to filter the Things so that a subset of
// Thing changes are relayed to the GUI - i.e. - not all Thing events are observed by the GUI.
RawAdds = Observable.FromEvent<EventDelegateAdd, Thing>(
ev => new EventDelegateAdd(ev),
h => mSource.AddingThing += h,
h => mSource.AddingThing -= h);
BufferedAdds = new Subject<Thing>();

RawChanges = Observable.FromEvent<EventDelegateChange, Thing>(
ev => new EventDelegateChange(ev),
h => mSource.ChangingThing += h,
h => mSource.ChangingThing -= h);
BufferedChanges = new Subject<Thing>();

RawRemoves = Observable.FromEvent<EventDelegateRemove, Thing>(
ev => new EventDelegateRemove(ev),
h => mSource.RemovingThing += h,
h => mSource.RemovingThing -= h);
BufferedRemoves = new Subject<Thing>();

_buffer = Observable.Merge(
_scheduler,
RawAdds.Select(e => Tuple.Create(e, ThingEventType.Add)),
RawChanges.Select(e => Tuple.Create(e, ThingEventType.Change)),
RawRemoves.Select(e => Tuple.Create(e, ThingEventType.Remove)))
.Buffer(TimeSpan.FromMilliseconds(eventRelayInterval), _scheduler);

_watcherDisposable = new SerialDisposable();
_watcherDisposable.Disposable = _buffer
.ObserveOn(_scheduler)
.Subscribe(batch =>
{
lock(SyncRoot) { _eventQueue.Add(batch); }
});
_disposables.Add(_watcherDisposable);

var pulse = Observable.Interval(TimeSpan.FromMilliseconds(eventRelayInterval), _scheduler);
_disposables.Add(pulse.ObserveOn(_scheduler).Subscribe(x => PlayBackCachedEvents()));
}

private void PlayBackCachedEvents()
{
BulkChangesStart(); // Raise Event to notify GUI to suspend screen updates

try
{
//_eventQueue.Dump();
lock(SyncRoot)
{
foreach(var batch in _eventQueue)
{
// Play back the list of events to push changes to ListView, TreeView, graphs, etc...
foreach(var evt in batch)
{
switch(evt.Item2)
{
case ThingEventType.Add: BufferedAdds.OnNext(evt.Item1); break;
case ThingEventType.Change: BufferedChanges.OnNext(evt.Item1);break;
case ThingEventType.Remove: BufferedRemoves.OnNext(evt.Item1);break;
}
}
}
_eventQueue.Clear();
}
}
catch(Exception ex)
{
Console.WriteLine("Exception during playback:" + ex);
}
BulkChangesEnd(); // Raise Event to notify GUI to allow control refresh
}

public void Dispose()
{
_disposables.Dispose();
}
}

In Rx, how to group events by id and throttle each group by multiple TimeSpans?

This works I think - I'll try to add fuller explanation later. Each alarm level has a defined threshold (per signal group). These are expected to be of increasing duration.

The basic idea is to have the signals of all previous levels feed into the current level. The first level is a "zero" level of the signals themselves that is filtered out before the alarm stream is returned. Note that TSignal keys need to support value identity.

I'm sure there's room for simplification!

Sample unit test:



Related Topics



Leave a reply



Submit