Good Introduction to the .Net Reactive Framework

Good introduction to the .NET Reactive Framework

Here's a wiki site with lots of code examples demonstrating how to use different features of the .NET Rx framework: http://rxwiki.wikidot.com/101samples

I found this to be the most comprehensive site out there, and the one that's quickest to get started with.

Reactive Framework for .NET examples that prove its usefulness

Here is a quick example. Program a drag operation in a fully declarative manner, using LINQ to events.

   //Create an observable with the initial position and dragged points using LINQ to Events
var mouseDragPoints = from md in e.GetMouseDown()
let startpos=md.EventArgs.GetPosition(e)
from mm in e.GetMouseMove().Until(e.GetMouseUp())
select new
{
StartPos = startpos,
CurrentPos = mm.EventArgs.GetPosition(e),
};

And draw a line from startpos to current pos

//Subscribe and draw a line from start position to current position  
mouseDragPoints.Subscribe
(item =>
{
//Draw a line from item.Startpos to item.CurrentPos
}
);

As you can see, there are no event handlers all over the places, nor boolean variables for managing the state.

If you are curious about those GetEventName() methods, suggesting you to read this entire article and download the source code and play with it.

Read it here and play with the source >>

Usage of Reactive Extensions in the Real World

In less than an hour I was able to add Rx support to MassTransit, an open source ESB:

https://github.com/MassTransit/MassTransit/tree/master/src/MassTransit.Reactive

Update: As for why it's a good fit, they already had a Subscribe/Unsubscribe mechanism in place. Adding Rx support means that those subscriptions can now be composed together easily. For example, you might have two kinds of messages that share some CorrelationId. With Rx you can trivially Join() the published messages by that identifier:

var someMessages = bus.AsObservable<SomeMessage>();
var otherMessages = bus.AsObservable<AnotherMessage>();

var joined = from s in someMessages
join o in otherMessages
on s.CorrelationId equals o.CorrelationId
select new { s.Something, o.OtherThing };

joined.Subscribe(x => Console.WriteLine(x));

Also: Check out https://github.com/reactiveui/ReactiveUI for an Rx-powered MVVM framework targeting XAML (WPF, Silverlight, WP), iOS and Android. Very, very cool stuff.

Reactive Extension (Rx) tutorial that is up to date

When learning Rx the first thing is to understand the philosophy behind IObservable and how it's push based nature compares with IEnumerable. I suggest the following one for a good explanation: A[nother] Simpler Tutorial for Reactive Extensions

Lee Campbell has nice series explaining the api and when to use them. He also tries to keep it up to date with latest releases: Reactive Extensions for .NET an Introduction
The series is now available as a book at Introduction to Rx

By the way, I have also written a blog post about solving real life problem with rx: Using Reactive Extensions for Streaming Data from Database

Hope this helps.

Guide to System.Reactive.Joins

Found an excellent SO question that shows the usage, but to me the overall purpose of Plan and Pattern is to create a compositional unit of observable as opposed to a composed observable. Semantics, I know, but to me it seems a little easier to use this syntax then the various other "Join" methods. It allows you to separate the join from the projection altogether, so you can store intermediate plans and compose them with other observables whenever you want.

For example:

// Suppose we have observables o1, o2, ..., o9. 
// All IObservable<int>.

var o1and2 = o1.And(o2); // Store this bad boy for later use. Pattern<int, int>

var o5and6and9 = o5
.And(o6)
.And(o9)
.Then((t1, t2, t3) => t1 + t2 + t3); // Plan<int>

var o3and7 = o3
.And(o7)
.Then((t1, t2) => string.Format("Result: {0}", t1 + t2)); // Plan<string>

var o12ando8and6 = o1and2
.And(o8)
.And(o6)
.Then((t1, t2, t3, t4) => ((decimal) t1, t2, t3.ToString(), t4));
// Plan<(decimal, int, string, int)>

// "When" groups similar results together.
// It will fire when any of the Patterns give a result.

var obs1 = Observable
.When(o1and2.Then((t1,t2) => t1+t2), o5and6and9); // IObservable<int>

var obs2 = Observable.When(o3and7); // IObservable<string>
var obs3 = Observable.When(o12ando8and6); // IObservable<(decimal, int, string,int)>

SO Article:
Reactive Extensions for .NET (Rx): Take action once all events are completed

Also, found an RX document that actually helped in understanding HOW to use this: http://www.clipcode.net/mentoring/RxReferenceLibrary.pdf [dead]

Why are Subjects not recommended in .NET Reactive Extensions?

Ok,
If we ignore my dogmatic ways and ignore "subjects are good/bad" all together. Let us look at the problem space.

I bet you either have 1 of 2 styles of system you need to ingrate to.

  1. The system raises an event or a call back when a message arrives
  2. You need to poll the system to see if there are any message to process

For option 1, easy, we just wrap it with the appropriate FromEvent method and we are done. To the Pub!

For option 2, we now need to consider how we poll this and how to do this effciently. Also when we get the value, how do we publish it?

I would imagine that you would want a dedicated thread for polling. You wouldn't want some other coder hammering the ThreadPool/TaskPool and leaving you in a ThreadPool starvation situation. Alternatively you don't want the hassle of context switching (I guess). So assume we have our own thread, we will probably have some sort of While/Sleep loop that we sit in to poll. When the check finds some messages we publish them. Well all of this sounds perfect for Observable.Create. Now we probably cant use a While loop as that wont allow us to ever return a Disposable to allow cancellation. Luckily you have read the whole book so are savvy with Recursive scheduling!

I imagine something like this could work. #NotTested

public class MessageListener
{
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;

public MessageListener()
{
_scheduler = new EventLoopScheduler();

var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();

_messages = messages;
messages.Connect();
}

public IObservable<IMessage> Messages
{
get {return _messages;}
}

private IObservable<IMessage> ListenToMessages()
{
return Observable.Create<IMessage>(o=>
{
return _scheduler.Schedule(recurse=>
{
try
{
var messages = GetMessages();
foreach (var msg in messages)
{
o.OnNext(msg);
}
recurse();
}
catch (Exception ex)
{
o.OnError(ex);
}
});
});
}

private IEnumerable<IMessage> GetMessages()
{
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
}
}

The reason I really don't like Subjects, is that is usually a case of the developer not really having a clear design on the problem. Hack in a subject, poke it here there and everywhere, and then let the poor support dev guess at WTF was going on. When you use the Create/Generate etc methods you are localizing the effects on the sequence. You can see it all in one method and you know no-one else is throwing in a nasty side effect. If I see a subject fields I now have to go looking for all the places in a class it is being used. If some MFer exposes one publicly, then all bets are off, who knows how this sequence is being used!
Async/Concurrency/Rx is hard. You don't need to make it harder by allowing side effects and causality programming to spin your head even more.

Upgrading old .Net 35 Project to 4.x - Where do I download Reactive Framework 4.x

The Rx-* packages have been frozen at 2.2.5 because the packages themselves require breaking changes to support .NET Core. And rather than bumping the major version, they've decided to start fresh with System.Reactive* naming.

You can simply use the PM console:

Install-Package System.Reactive

You will need an updated version of the nuget package installer to be able to parse the 3.x packages. Here's the release announcement.



Related Topics



Leave a reply



Submit