Rx: How to Respond Immediately, and Throttle Subsequent Requests

Rx: How can I respond immediately, and throttle subsequent requests

The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

Consider instead this timing:

factory.StartNewDelayed(300,() =>
{
Console.WriteLine("Batch 1 (300ms delay)");
subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
Console.WriteLine("Batch 2 (700ms delay)");
subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});

What I get is:

Batch 1 (300ms delay)

Handling 1 at 356ms

Batch 2 (700ms delay)

Handling 2 at 750ms

Batch 3 (1.3s delay)

Handling 3 at 1346ms

Batch 4 (1.6s delay)

Handling 4 at 1644ms

This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

What I want is:

Batch 1 (300ms delay)

Handling 1 at ~300ms

Batch 2 (700ms delay)

Batch 3 (1.3s delay)

Handling 3 at ~1300ms

Batch 4 (1.6s delay)

After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

bool isCoolingDown = false;

subject
.Where(_ => !isCoolingDown)
.Subscribe(
i =>
{
DoStuff(i);

isCoolingDown = true;

Observable
.Interval(cooldownInterval)
.Take(1)
.Subscribe(_ => isCoolingDown = false);
});

Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.

How to use Reactive Extensions to throttle client requests

Yes, one way is to group the requests by client id and selectively apply a throttle.

Say you had an event like this:

public class MyEvent
{
public int ClientId { get; set; }

public override string ToString()
{
return ClientId.ToString();
}
}

Lets set up slow and fast clients:

var slow = Observable.Interval(TimeSpan.FromSeconds(2))
.Select(_ => new MyEvent { ClientId = 1 });

var fast = Observable.Interval(TimeSpan.FromSeconds(0.5))
.Select(_ => new MyEvent { ClientId = 2 });

var all = slow.Merge(fast);

Now throttle selectively like this:

var throttled = all.GroupBy(x => x.ClientId).Select(
// apply the throttle here, this could even test the key
// property to apply different throttles to different clients
x => x.Throttle(TimeSpan.FromSeconds(1)))
.SelectMany(x => x);

And test it:

throttled.Subscribe(x => Console.WriteLine(x.ToString())); 

With this throttle the fast client will never get a response - Throttle will suppress his requests indefinitely because they are less than a second apart. You can use other operators to suppress in different ways - e.g. Sample can pick out a single request over a given time interval.

After your question edit

You can apply different rules than by ClientId and use of Throttle - you can use DistinctUntilChanged() on a client stream to weed out duplicate requests, for example.

How to throttle event stream using RX?

Here's is what I got with some help from the RX Forum:

The idea is to issue a series of "tickets" for the original sequence to fire. These "tickets" are delayed for the timeout, excluding the very first one, which is immediately pre-pended to the ticket sequence. When an event comes in and there is a ticket waiting, the event fires immediately, otherwise it waits till the ticket and then fires. When it fires, the next ticket is issued, and so on...

To combine the tickets and original events, we need a combinator. Unfortunately, the "standard" .CombineLatest cannot be used here because it would fire on tickets and events that were used previousely. So I had to create my own combinator, which is basically a filtered .CombineLatest, that fires only when both elements in the combination are "fresh" - were never returned before. I call it .CombineVeryLatest aka .BrokenZip ;)

Using .CombineVeryLatest, the above idea can be implemented as such:

    public static IObservable<T> SampleResponsive<T>(
this IObservable<T> source, TimeSpan delay)
{
return source.Publish(src =>
{
var fire = new Subject<T>();

var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());

var subscription = src
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);

return fire.Finally(subscription.Dispose);
});
}

public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
var ls = leftSource.Select(x => new Used<TLeft>(x));
var rs = rightSource.Select(x => new Used<TRight>(x));
var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
var fltCmb = cmb
.Where(a => !(a.x.IsUsed || a.y.IsUsed))
.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
}

private class Used<T>
{
internal T Value { get; private set; }
internal bool IsUsed { get; set; }

internal Used(T value)
{
Value = value;
}
}

Edit: here's another more compact variation of CombineVeryLatest proposed by Andreas Köpf on the forum:

public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
return Observable.Defer(() =>
{
int l = -1, r = -1;
return Observable.CombineLatest(
leftSource.Select(Tuple.Create<TLeft, int>),
rightSource.Select(Tuple.Create<TRight, int>),
(x, y) => new { x, y })
.Where(t => t.x.Item2 != l && t.y.Item2 != r)
.Do(t => { l = t.x.Item2; r = t.y.Item2; })
.Select(t => selector(t.x.Item1, t.y.Item1));
});
}

Reactive (RX) throttle without loss

I think you will have to write your own operator, or do some toying around with Window. Like the other comments, I am not 100% sure on your requirements, but I have tried to capture them in these tests.

using System;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;

[TestFixture]
public class Throttle : ReactiveTest
{
private TestScheduler _testScheduler;
private ITestableObservable<int> _sourceSequence;
private ITestableObserver<int> _observer;

[SetUp]
public void SetUp()
{
var windowPeriod = TimeSpan.FromSeconds(5);
_testScheduler = new TestScheduler();
_sourceSequence = _testScheduler.CreateColdObservable(
//Question does the window start when the event starts, or at time 0?
OnNext(0.1.Seconds(), 1),
OnNext(1.0.Seconds(), 2),
OnNext(2.0.Seconds(), 3),
OnNext(7.0.Seconds(), 4),
OnCompleted<int>(100.0.Seconds())
);

_observer = _testScheduler.CreateObserver<int>();
_sourceSequence
.Window(windowPeriod, _testScheduler)
.SelectMany(window =>
window.Publish(
shared => shared.Take(1).Concat(shared.Skip(1).TakeLast(1))
)
)
.Subscribe(_observer);
_testScheduler.Start();
}

[Test]
public void Should_eagerly_publish_new_events()
{
Assert.AreEqual(OnNext(0.1.Seconds(), 1), _observer.Messages[0]);
}

[Test]
public void Should_publish_last_event_of_a_window()
{
//OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
Assert.AreEqual(OnNext(5.0.Seconds(), 3), _observer.Messages[1]);
}

[Test]
public void Should_only_publish_event_once_if_it_is_the_only_event_for_the_window()
{
Assert.AreEqual(OnNext(7.0.Seconds(), 4), _observer.Messages[2]);
Assert.AreEqual(OnCompleted<int>(100.0.Seconds()), _observer.Messages[3]);
}

[Test]
public void AsOneTest()
{
var expected = new[]
{
OnNext(0.1.Seconds(), 1),
//OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
OnNext(5.0.Seconds(), 3),
OnNext(7.0.Seconds(), 4),
OnCompleted<int>(100.0.Seconds())
};
CollectionAssert.AreEqual(expected, _observer.Messages);
}
}

Using Rx to debounce

Take a look at this demo: Curing Your Event Processing Blues with Reactive Extensions (Rx)

Sliding' RX.net .Throttle() Window

As @nikoniko already mentioned, throttle will do the trick.

using System;
using System.Reactive.Linq;

namespace Printing {
class Program {
static void Main(string[] args) {
var source = Observable.Interval(TimeSpan.FromMilliseconds(333))
.Do(i => Console.WriteLine($"new item: {i}"));
var sampling = source.Throttle(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"sampled: {i}"));

var subscription = sampling.Subscribe();

Console.ReadLine();

subscription.Dispose();

Console.ReadLine();
}
}

}
Resulting in nothing because the events from source arrive in two high frequency. But if source need more time to deliver an element then the timespan given in throttle:

using System;
using System.Reactive.Linq;

namespace Printing {
class Program {
static void Main(string[] args) {
var source = Observable.Interval(TimeSpan.FromSeconds(1.2))
.Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: new item: {i}"));
var sampling = source.Throttle(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: {i}"));

var subscription = sampling.Subscribe();

Console.ReadLine();

subscription.Dispose();

Console.ReadLine();
}
}
}

The result will appear after throttling time is over. As you can see, on second after a event in source is fired, it will appear in the result.

08:32:26: new item: 0
08:32:27: throttle 0
08:32:28: new item: 1
08:32:29: throttle 1
08:32:30: new item: 2
08:32:31: throttle 2
08:32:32: new item: 3
08:32:33: throttle 3
08:32:34: new item: 4
08:32:35: throttle 4
08:32:36: new item: 5
08:32:37: throttle 5

How to throttle requests to a cold flowable upstream

One of the RxJavaExtensions (RxJava2 version) transformers should be a solution to your problem – spanout(). It inserts delay between emissions from upstream. I have changed just one line in your code (replaced concatMap() by spanout()):

val startTime = System.currentTimeMillis()
fun log(msg: String) {
println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}

val generator = Flowable.generate<Int, Int>(
Callable { 0 },
BiFunction { state, emitter ->
val value = state + 1
log("generating $value")
emitter.onNext(value)
return@BiFunction value
})

val subscription = generator
.compose(FlowableTransformers.spanout(1, 1, TimeUnit.SECONDS)) // <– changed line
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io(), true, 1)
.subscribe {
log("processing $it")
if (it % 5 == 0) {
log("starting sleep")
try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
log("done sleeping")
}
}

Thread.sleep(8_000)
subscription.dispose()
log("done")

Produced output:

RxComputationThreadPool-1 -  153 - generating 1
RxCachedThreadScheduler-1 - 1178 - processing 1
RxComputationThreadPool-1 - 1179 - generating 2
RxCachedThreadScheduler-1 - 2176 - processing 2
RxComputationThreadPool-1 - 2177 - generating 3
RxCachedThreadScheduler-1 - 3177 - processing 3
RxComputationThreadPool-1 - 3178 - generating 4
RxCachedThreadScheduler-1 - 4175 - processing 4
RxComputationThreadPool-1 - 4175 - generating 5
RxCachedThreadScheduler-1 - 5177 - processing 5
RxCachedThreadScheduler-1 - 5178 - starting sleep
RxCachedThreadScheduler-1 - 7383 - done sleeping
RxComputationThreadPool-1 - 7384 - generating 6
RxCachedThreadScheduler-1 - 7384 - processing 6
RxComputationThreadPool-1 - 7385 - generating 7
main - 8151 - done

Conditional delay+throttle operator

The question isn't completely clear, so using the following test case as a scenario:

Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)

This should result in the following:

//        T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
// delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed, but interrupted by 1,
// 1: Non-delayed, emit immediately
// 2: Delayed, emit after 1.5 seconds
// 3: Delayed, since emitted during a delay, ignored
// 4: Delayed, but interrupted by 5.
// 5: Non-delayed, emit immediately
// 6: Delayed, but interrupted by 7.
// 7: Non-delayed, emit immediately
// 8: Delayed, but interrupted by 9
// 9: Delayed, since emitted during a delay, ignored

If that doesn't line up with the requirements, please clarify the question. @Theodore's solution gets the timing right, but emits 3 and 9, ignoring the "cancel/skip the value scheduled for delayed emission and emit the new value" clause.

This is functionally equivalent to Theodore's code, but (IMO) easier to work with and understand:

public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.SelectMany(t => t.WithDelay
? Observable.Return(t)
.Delay(delay, scheduler)
.TakeUntil(published.Where(t2 => !t2.WithDelay))
: Observable.Return(t)
)
)
.Select(e => e.Item);
}

From there, I had to embed the state of whether or not you're in delay with .Scan:

public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
{
return DelayWhen3(source, delay, condition, Scheduler.Default);
}

public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.Timestamp(scheduler)
.Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
if(!t.Value.WithDelay)
//value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
else
if (state.delayOverTime > t.Timestamp)
//value should be delayed, but current delay already in progress. Ignore value.
return (state.delayOverTime, Observable.Empty<T>());
else
//value should be delayed, no delay in progress. Set delay state, and return delayed observable.
return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
})
)
.SelectMany(t => t.output);
}

In the .Scan operator, you embed the time when the previous Delay expires. That way you know can handle a value that should be delayed within an existing delay. I added scheduler parameters to the time-sensitive functions to enable testing:

var ts = new TestScheduler();

var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
.Take(10)
.DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);

var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();

var expected = new List<Recorded<Notification<long>>> {
new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
};

ReactiveAssert.AreElementsEqual(expected, observer.Messages);

And code for MsTicks:

public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}


Related Topics



Leave a reply



Submit