Why Are Subjects Not Recommended in .Net Reactive Extensions

Why are Subjects not recommended in .NET Reactive Extensions?

Ok,
If we ignore my dogmatic ways and ignore "subjects are good/bad" all together. Let us look at the problem space.

I bet you either have 1 of 2 styles of system you need to ingrate to.

  1. The system raises an event or a call back when a message arrives
  2. You need to poll the system to see if there are any message to process

For option 1, easy, we just wrap it with the appropriate FromEvent method and we are done. To the Pub!

For option 2, we now need to consider how we poll this and how to do this effciently. Also when we get the value, how do we publish it?

I would imagine that you would want a dedicated thread for polling. You wouldn't want some other coder hammering the ThreadPool/TaskPool and leaving you in a ThreadPool starvation situation. Alternatively you don't want the hassle of context switching (I guess). So assume we have our own thread, we will probably have some sort of While/Sleep loop that we sit in to poll. When the check finds some messages we publish them. Well all of this sounds perfect for Observable.Create. Now we probably cant use a While loop as that wont allow us to ever return a Disposable to allow cancellation. Luckily you have read the whole book so are savvy with Recursive scheduling!

I imagine something like this could work. #NotTested

public class MessageListener
{
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;

public MessageListener()
{
_scheduler = new EventLoopScheduler();

var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();

_messages = messages;
messages.Connect();
}

public IObservable<IMessage> Messages
{
get {return _messages;}
}

private IObservable<IMessage> ListenToMessages()
{
return Observable.Create<IMessage>(o=>
{
return _scheduler.Schedule(recurse=>
{
try
{
var messages = GetMessages();
foreach (var msg in messages)
{
o.OnNext(msg);
}
recurse();
}
catch (Exception ex)
{
o.OnError(ex);
}
});
});
}

private IEnumerable<IMessage> GetMessages()
{
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
}
}

The reason I really don't like Subjects, is that is usually a case of the developer not really having a clear design on the problem. Hack in a subject, poke it here there and everywhere, and then let the poor support dev guess at WTF was going on. When you use the Create/Generate etc methods you are localizing the effects on the sequence. You can see it all in one method and you know no-one else is throwing in a nasty side effect. If I see a subject fields I now have to go looking for all the places in a class it is being used. If some MFer exposes one publicly, then all bets are off, who knows how this sequence is being used!
Async/Concurrency/Rx is hard. You don't need to make it harder by allowing side effects and causality programming to spin your head even more.

Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits only if the value has changed?

After glancing a bit at the source code of the BehaviorSubject<T> class, it seems that your DistinctSubject<T> implementation will behave differently in case an OnError is followed by an OnNext:

var subject = new DistinctSubject<int>(2021);
subject.OnError(new ApplicationException());
subject.OnNext(2022); // throws ApplicationException

This will throw, while doing the same with the BehaviorSubject<T> will not throw (the OnNext is just ignored).

My suggestion is to use the DistinctUntilChanged operator in the implementation, like this:

public class DistinctSubject<T> : ISubject<T>, IDisposable
{
private readonly BehaviorSubject<T> _subject;
private readonly IObservable<T> _distinctUntilChanged;

public DistinctSubject(T initialValue, IEqualityComparer<T> comparer = default)
{
_subject = new BehaviorSubject<T>(initialValue);
_distinctUntilChanged = _subject.DistinctUntilChanged(
comparer ?? EqualityComparer<T>.Default);
}

public T Value => _subject.Value;
public void OnNext(T value) => _subject.OnNext(value);
public void OnError(Exception error) => _subject.OnError(error);
public void OnCompleted() => _subject.OnCompleted();

public IDisposable Subscribe(IObserver<T> observer) =>
_distinctUntilChanged.Subscribe(observer);

public void Dispose() => _subject.Dispose();
}

If you are worried about the needless allocation of an object, then you are not already familiar with the spirit of Rx. This library is about features and ease of use, not about performance or efficiency!

Reactive extensions Subject uses

First, a lot of folks will tell you Subject<T> doesn't belong, since it goes against some other tenets/patterns in the Rx framework.

That said, they act as either an IObservable or an IObserver, so you get some useful functionality out of them - I generally use them during the initial development stages for:

  • A "debug point" of sorts, where I can subscribe to an IObservable chain inline with a Subject<T>, and inspect the contents with the debugger.

  • An "observable on demand", where I can manually call OnNext and pass in data I want to inject into the stream

  • Used to use them to replicate what ConnectableObserable now does - a "broadcast" mechanism for multiple subscribers to a single Observable, but that can be done with Publish now.

  • Bridging layer between disparate systems; again, this is largely unnecessary now with the various FromAsync, FromEvent extensions, but they can still be used as such (basically, the "old" system injects events into the Subject<T> via OnNext, and from then on the normal Rx flow.

Should I be calling Dispose on Reactive Extensions (Rx) SubjectT

Nope, you don't really need to do this. When worrying about memory usage and lifetimes, think about disposing Subscriptions, not Subjects.

Alternative to using Subject in reactive programming?

It's usually a good idea to avoid subjects. In your code you're exposing the subject directly to the calling code. Any consumer that does ((Subject<string>)socket.Messages).OnCompleted(); will stop your code working.

You're also newing up a WebSocket which should be disposed of afterwards.

There is a way to get ride of the subject and make it all behave a lot better.

Try this:

public IObservable<string> Connect(Uri uri)
{
return
Observable
.Using(
() =>
{
var webSocket = new MessageWebSocket();
webSocket.Control.MessageType = SocketMessageType.Utf8;
return webSocket;
},
webSocket =>
Observable
.FromAsync(() => webSocket.ConnectAsync(uri))
.SelectMany(u =>
Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
.SelectMany(pattern =>
Observable
.Using(
() =>
{
var reader = pattern.EventArgs.GetDataReader();
reader.UnicodeEncoding = UnicodeEncoding.UTF8;
return reader;
},
reader => Observable.Return(reader.ReadString(reader.UnconsumedBufferLength))))));

}

Here's how to avoid the subject with your existing code style:

public IObservable<string> ConnectAsync(Uri uri)
{
return
Observable
.Create<string>(async o =>
{
var webSocket = new MessageWebSocket();

webSocket.Control.MessageType = SocketMessageType.Utf8;

var subscription = Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
.Select(ReadString)
.Subscribe(o);

await webSocket.ConnectAsync(uri);

return subscription;
});
}

Here's a quick test that this works:

void Main()
{
Connect(new Uri("https://stackoverflow.com/")).Subscribe(x => Console.WriteLine(x.Substring(0, 24)));
}

public IObservable<string> Connect(Uri uri)
{
return
Observable
.Create<string>(async o =>
{
var webClient = new WebClient();

webClient.UseDefaultCredentials = true;

var subscription =
Observable
.Using(
() => new CompositeDisposable(webClient, Disposable.Create(() => Console.WriteLine("Disposed!"))),
_ =>
Observable
.FromEventPattern<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
h => webClient.DownloadStringCompleted += h, h => webClient.DownloadStringCompleted -= h)
.Take(1))
.Select(x => x.EventArgs.Result)
.Subscribe(o);

await webClient.DownloadStringTaskAsync(uri);

return subscription;
});
}

Note that "Disposed!" is displayed showing that the WebClient is disposed.

RX Subjects - are they to be avoided?

Erik Meijer is thinking in a purely functional way - Subjects are the mutable variables of Rx. So, in general usage he's right - using Subjects is sometimes a way to cop out of Thinking Functionally, and if you use them too much, you're trying to row upstream.

However! Subject are extremely useful when you're interfacing with the non-Functional world of .NET. Wrapping an event or callback method? Subjects are great for that. Trying to put an Rx "interface" onto some existing code? Use a Subject!

Reactive Extensions in .Net (C#) - SubjectT instance only processing one subscription

The simplest way to do this is to only subscribe to OnNext messages, as Paulo's suggests. You can also remove or suppress OnCompleted messages from the observable.

However, the best way to do this is to probably replace the messenger Subject. I don't know your larger use case, but a good goal is to minimize Subject use. They're great for learning, and terrible for applications.

In this case, you can replace the whole code with a one-liner:

Observable.Merge(
Observable.Return<string>("File 1"),
Observable.Return<string>("File 2")
)
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));

If you wanted to simulate dynamically adding observables, you would probably want to do that with a Subject<IObservable<string>>:

Subject<IObservable<string>> consoleWriter = new Subject<IObservable<string>>();
consoleWriter
.Merge()
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));

var pathObservable = Observable.Return<string>("File 1");
consoleWriter.OnNext(pathObservable);

var pathObservable2 = Observable.Return<string>("File 2");
consoleWriter.OnNext(pathObservable2);


Related Topics



Leave a reply



Submit