How to See What My Reactive Extensions Query Is Doing

How can I see what my reactive extensions query is doing?

You can append this function liberally to your Rx operators while you are developing them to see what's happening:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
opName = opName ?? "IObservable";
Console.WriteLine("{0}: Observable obtained on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId);

return Observable.Create<T>(obs =>
{
Console.WriteLine("{0}: Subscribed to on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId);

try
{
var subscription = source
.Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
opName,
x,
Thread.CurrentThread.ManagedThreadId),
ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
opName,
ex,
Thread.CurrentThread.ManagedThreadId),
() => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId)
)
.Subscribe(obs);
return new CompositeDisposable(
subscription,
Disposable.Create(() => Console.WriteLine(
"{0}: Cleaned up on Thread: {1}",
opName,
Thread.CurrentThread.ManagedThreadId)));
}
finally
{
Console.WriteLine("{0}: Subscription completed.", opName);
}
});
}

Here's an example usage, shows a subtle behaviour difference of Range:

Observable.Range(0, 1).Spy("Range").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7

But this:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7

Spot the difference?

Obviously you can alter this to write to logs or to Debug, or use preprocessor directives to do a lean pass-through subscription on a Release build etc...

You can apply Spy throughout a chain of operators. e.g.:

Observable.Range(0,3).Spy("Range")
.Scan((acc, i) => acc + i).Spy("Scan").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7

I'm sure you can find ways of enriching this to suit your purposes.

Search on TextChanged with Reactive Extensions

I think you want something like this. EDIT: From your comments, I see you have a synchronous repository API - I'll leave the asynchronous version in, and add a synchronous version afterwards. Notes inline:

Asynchronous Repository Version

An asynchronous repository interface could be something like this:

public interface IPartyRepository
{
Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}

Then I refactor the query as:

var searchStream = Observable.FromEventPattern(
s => txtSearch.TextChanged += s,
s => txtSearch.TextChanged -= s)
.Select(evt => txtSearch.Text) // better to select on the UI thread
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
// placement of this is important to avoid races updating the UI
.ObserveOn(SynchronizationContext.Current)
.Do(_ =>
{
// I like to use Do to make in-stream side-effects explicit
this.parties.Clear();
this.partyBindingSource.ResetBindings(false);
})
// This is "the money" part of the answer:
// Don't subscribe, just project the search term
// into the query...
.Select(searchTerm =>
{
long partyCount;
var foundParties = string.IsNullOrEmpty(searchTerm)
? partyRepository.GetAllAsync(out partyCount)
: partyRepository.SearchByNameAndNotesAsync(searchTerm);

// I assume the intention of the Buffer was to load
// the data into the UI in batches. If so, you can use Buffer from nuget
// package Ix-Main like this to get IEnumerable<T> batched up
// without splitting it up into unit sized pieces first
return foundParties
// this ToObs gets us into the monad
// and returns IObservable<IEnumerable<Party>>
.ToObservable()
// the ToObs here gets us into the monad from
// the IEnum<IList<Party>> returned by Buffer
// and the SelectMany flattens so the output
// is IObservable<IList<Party>>
.SelectMany(x => x.Buffer(500).ToObservable())
// placement of this is again important to avoid races updating the UI
// erroneously putting it after the Switch is a very common bug
.ObserveOn(SynchronizationContext.Current);
})
// At this point we have IObservable<IObservable<IList<Party>>
// Switch flattens and returns the most recent inner IObservable,
// cancelling any previous pending set of batched results
// superceded due to a textbox change
// i.e. the previous inner IObservable<...> if it was incomplete
// - it's the equivalent of your TakeUntil, but a bit neater
.Switch()
.Subscribe(searchResults =>
{
this.parties.AddRange(searchResults);
this.partyBindingSource.ResetBindings(false);
},
ex => { },
() => { });

Synchronous Repository Version

An synchronous repository interface could be something like this:

public interface IPartyRepository
{
IEnumerable<Party> GetAll(out long partyCount);
IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}

Personally, I don't recommend a repository interface be synchronous like this. Why? It is typically going to do IO, so you will wastefully block a thread.

You might say the client could call from a background thread, or you could wrap their call in a task - but this is not the right way to go I think.

  • The client doesn't "know" you are going to block; it's not expressed in the contract
  • It should be the repository that handles the asynchronous aspect of the implementation - after all, how this is best achieved will only be known best by the repository implementer.

Anyway, accepting the above, one way to implement is like this (of course it's mostly similar to the async version so I've only annotated the differences):

var searchStream = Observable.FromEventPattern(
s => txtSearch.TextChanged += s,
s => txtSearch.TextChanged -= s)
.Select(evt => txtSearch.Text)
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.ObserveOn(SynchronizationContext.Current)
.Do(_ =>
{
this.parties.Clear();
this.partyBindingSource.ResetBindings(false);
})
.Select(searchTerm =>
// Here we wrap the synchronous repository into an
// async call. Note it's simply not enough to call
// ToObservable(Scheduler.Default) on the enumerable
// because this can actually still block up to the point that the
// first result is yielded. Doing as we have here,
// we guarantee the UI stays responsive
Observable.Start(() =>
{
long partyCount;
var foundParties = string.IsNullOrEmpty(searchTerm)
? partyRepository.GetAll(out partyCount)
: partyRepository.SearchByNameAndNotes(searchTerm);

return foundParties;
}) // Note you can supply a scheduler, default is Scheduler.Default
.SelectMany(x => x.Buffer(500).ToObservable())
.ObserveOn(SynchronizationContext.Current))
.Switch()
.Subscribe(searchResults =>
{
this.parties.AddRange(searchResults);
this.partyBindingSource.ResetBindings(false);
},
ex => { },
() => { });

Database polling with Reactive Extensions

This is a fairly classic case of using Rx to poll another system. Most people will use Observable.Interval as their go-to operator, and for most it will be fine.

However you have specific requirements on timeouts and retry. In this case I think you are better off using a combination of operators:

  • Observable.Timer to allow you to execute your query in a specified time
  • Timeout to identify and database queries that have overrun
  • ToObservable() to map your Task results to an observable sequence.
  • Retry to allow you to recover after timeouts
  • Repeat to allow you to continue after successful database queries. This will also keep that initial period/gap between the completion of the previous database query and the commencement of the next one.

This working LINQPad snippet should show you the query works properly:

void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);

//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success

query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}

The results look like:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

The key part of the sample is....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success

EDIT:
Here is a further explanation of how to arrive at this solution. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

Reactive Extensions Observable from a collection of web queries

Since the reactive framework can handle the asynchrony for you you could try this:

var syms = new List<string>() { "ANZ", "BHP", };

var query =
from i in Observable.Interval(TimeSpan.FromSeconds(1.0))
from sym in syms.ToObservable()
from d in GoToWeb(sym).ToObservable()
select new
{
Symbol = sym,
Value = d,
};

You'll need to add a reference to the namespace System.Reactive.Threading.Tasks to get the ToObservable() extension for tasks.

Does this meet your needs?

Is there a way to listen that there are no events that are being raised in Reactive Extensions?

Here is all the code to show how the code above could be ported to WPF. It appears that there is a gap in communication here, so I have created the whole wpf app to prove the point.

<Window x:Class="StackoverFlow_23764884.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="MainWindow" Height="350" Width="525">
<StackPanel>
<HeaderedContentControl Header="When checked, spellcheck is enabled (emulated)">
<CheckBox x:Name="spellChecking" IsChecked="True" IsEnabled="False"/>
</HeaderedContentControl>

<HeaderedContentControl Header="Type here to see the Spellcheck enable and disable">
<RichTextBox x:Name="meh" Width="400" Height="300" />
</HeaderedContentControl>
</StackPanel>
</Window>

And the code behind:

using System;
using System.Reactive.Linq;
using System.Windows;
using System.Windows.Controls;

namespace StackoverFlow_23764884
{
/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();

var source = meh.ObserveTextChanged();
var disableSpellcheck = source.Select(_ => false);
var enableSpellcheck = source.Select(_ => Observable.Timer(TimeSpan.FromSeconds(1)))
.Switch()
.Select(_ => true);

disableSpellcheck.Merge(enableSpellcheck)
.DistinctUntilChanged()
.ObserveOnDispatcher()
.Subscribe(isEnabled => spellChecking.IsChecked=isEnabled);
}

}

public static class ObEx
{
public static IObservable<EventArgs> ObserveTextChanged(this RichTextBox rtb)
{
return Observable.FromEventPattern<TextChangedEventHandler, EventArgs>(
h => rtb.TextChanged += h,
h => rtb.TextChanged -= h)
.Select(ep => ep.EventArgs);
}
}
}

I pulled in the Rx-WPF nuget packeage will pulled everything else required for the code. This is .NET 4.5.

This is a sample to show how to solve the problem. i.e. I don't recommend using .ObserveOnDispatcher(), I dont recommend writing code-behind, and I know that setting IsEnabled on a checkbox is not actually doing a spellcheck. I am hoping that this is enough to the audience to recreate their actual solution.

I do hope it helps though.

Reactive Extensions seem very slow - am I doing something wrong?

My guess is that the Rx team focuses on building the functionality first and doesn't care about performance optimization yet.

Use a profiler to determine bottlenecks and replace slow Rx classes with your own optimized versions.

Below are two examples.

Results:


Delegate - (1000000) - 00:00:00.0368748

Simple - NewThread - (1000000) - 00:00:00.0207676
Simple - CurrentThread - (1000000) - 00:00:00.0214599
Simple - Immediate - (1000000) - 00:00:00.0162026
Simple - ThreadPool - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0529137

First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:

private IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}

Test:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;

var start = Stopwatch.StartNew();

var observable = CreateFastObservable(iterations);

observable.SubscribeOn(scheduler).Run(action);

OutputTestDuration("Simple - " + mode, start);
}

Subjects add a lot of overhead. Here's a subject that is stripped of much of the functionality expected from a subject, but it's fast:

class FastSubject<T> : ISubject<T>
{
private event Action onCompleted;
private event Action<Exception> onError;
private event Action<T> onNext;

public FastSubject()
{
onCompleted += () => { };
onError += error => { };
onNext += value => { };
}

public void OnCompleted()
{
this.onCompleted();
}

public void OnError(Exception error)
{
this.onError(error);
}

public void OnNext(T value)
{
this.onNext(value);
}

public IDisposable Subscribe(IObserver<T> observer)
{
this.onCompleted += observer.OnCompleted;
this.onError += observer.OnError;
this.onNext += observer.OnNext;

return Disposable.Create(() =>
{
this.onCompleted -= observer.OnCompleted;
this.onError -= observer.OnError;
this.onNext -= observer.OnNext;
});
}
}

Test:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;

var start = Stopwatch.StartNew();

var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

observable.SubscribeOn(scheduler).Run(action);

OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}

Reactive Extensions Instant Search for WPF/MVVM

Got this working with ReactiveUI.

The solution is based on a blog post at ReactiveUI, but the code there is a little bit out of date. I am hosting the solution on BitBucket for ease of access. It uses ReactiveUI 5.5.1.

This is the ViewModel from that solution. SearchText is bound to a TextBox in the View where the user types his query, while SearchResults is bound to a ListBox displaying the results.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using System.ComponentModel;
using System.Reactive.Linq;
using System.Windows.Input;

using ReactiveUI;

namespace ReactiveExtensionsSearch
{
public class MainWindowViewModel : ReactiveObject
{
private string[] _repository = new string[]
{ "Mario", "Maria", "Gigi", "Jack", "James", "Jeremy" };
private ObservableAsPropertyHelper<ObservableCollection<string>> _searchResults;
private string _searchText;
private ICommand _executeSearchCommand;

public string SearchText
{
get
{
return _searchText;
}
set
{
this.RaiseAndSetIfChanged(ref _searchText, value);
}
}

public ObservableCollection<string> SearchResults
{
get
{
return _searchResults.Value;
}
}

public MainWindowViewModel()
{
var executeSearchCommand = new ReactiveCommand();
var results = executeSearchCommand.RegisterAsyncFunction(s => { return ExecuteSearch(s as string); });
_executeSearchCommand = executeSearchCommand;

this.ObservableForProperty<MainWindowViewModel, string>("SearchText")
.Throttle(TimeSpan.FromMilliseconds(800))
.Select(x => x.Value)
.DistinctUntilChanged()
.Where(x => !string.IsNullOrWhiteSpace(x))
.Subscribe(_executeSearchCommand.Execute);

_searchResults = new ObservableAsPropertyHelper<ObservableCollection<string>>(results, _ => raisePropertyChanged("SearchResults"));
}

private ObservableCollection<string> ExecuteSearch(string searchText)
{
var q = from s in _repository where s.ToLower().StartsWith(searchText.ToLower()) select s;
var results = new ObservableCollection<string>(q);
return results;
}
}
}

Reactive extensions(Rx) Switch() produces new observable which is not subscribed to provided OnCompleted()

Switch result will only Complete when your outer observable (_performSearchSubject) completes. I assume in your case this one never does (it's probably bound to a user action performing the search).

What's unclear is when you expect PositionQueryCompleted to be called. If It's after each and every successful query is processed, then your stream needs to be modified, because Switch lost you the information that the query stream completed, but it also lacks information about the UI (wrong scheduler even) to say whether its data was actually processed.

There may be other ways to achieve it, but basically you want your query stream complete to survive through Switch (which currently ignore this event). For instance you can transform your query stream to have n+1 events, with one extra for the complete:

    _performSearchSubject
.AsObservable()
.Select(_ =>
PerformQuery()
.Select(Data => new { Data, Complete = false})
.Concat(Observable.Return(new { Data = (string)null, Complete = true })))

You can safely apply .Switch().ObserveOn(_synchronizationContextService.SynchronizationContext) on it, but then you need to modify your subscription:

    .Subscribe(data => {
if (data.Complete) DataArrivedForPositions(data.Data);
else PositionQueryCompleted()
}, PositionQueryError)

Poll a webservice using Reactive Extensions and bind the last x results

Try this:

// timer that completes after 1 second
var intervalTimer = Observable
.Empty<string>()
.Delay(TimeSpan.FromSeconds(1));

// queries one time whenever subscribed
var query = Observable.FromAsync(GetCurrentDate);

// query + interval timer which completes
// only after both the query and the timer
// have expired

var intervalQuery = Observable.Merge(query, intervalTimer);

// Re-issue the query whenever intervalQuery completes
var queryLoop = intervalQuery.Repeat();

// Keep the 20 most recent results
// Note. Use an immutable list for this
// https://www.nuget.org/packages/microsoft.bcl.immutable
// otherwise you will have problems with
// the list changing while an observer
// is still observing it.
var recentResults = queryLoop.Scan(
ImmutableList.Create<string>(), // starts off empty
(acc, item) =>
{
acc = acc.Add(item);
if (acc.Count > 20)
{
acc = acc.RemoveAt(0);
}

return acc;
});

// store the results
recentResults
.ObserveOnDispatcher()
.Subscribe(items =>
{
this.CurrentTime = items[0];
this.RecentItems = items;
});


Related Topics



Leave a reply



Submit