What are the semantics of different RxJS subjects?
Semantics differ according to the type of subjects. I will divide them in two kinds : vanilla (Rx.Subject
), and special-purpose subjects (the other three). The special-purpose subjects share part of the semantics of the vanilla subject with a few caveats due to their specialization (for instance, completion/reconnection behaviour).
Vanilla Rx.Subject semantics
Key features
- subjects implement the observer, observable interface (and the disposable interface as they have a
dispose
handler on their prototype). That means, among other things, they have:- observer interface :
onNext
,onError
,onComplete
method - observable interface :
subscribe
method
- observer interface :
- you can cast a subject to an observer or to an observable, hiding the implementation of the extra interfaces (cf.
.asObserver()
, and.asObservable()
) if need be - the subject being an observable, you can subscribe several observers to it. That observable will then broadcast its data to all its observers. Internally the subject maintains an array of observers.
- the subject being an observer, you can subscribe it to any observable
- the observer and the observable which compose the subject being two distinct entities, you can use them independently of the other if that's your use case.
dispose
ing a subject will unsubscribe all observers and release resources.- Subjects do not take a scheduler but rather assume that all serialization and grammatical correctness are handled by the caller of the subject.
- The default behaviour of subjects is to emit synchronously their values to the observers, starting with the first subscribed observer to the last. In most cases, order will not matter, in others it will.
- subjects implement the observer, observable interface (and the disposable interface as they have a
I quote a key aspect of Rxjs contract and grammar :
This grammar allows observable sequences to send any amount (0 or more) of onNext messages to the subscribed observer instance, optionally followed by a single success (onCompleted) or failure (onError) message.
a vanilla subject (created with
new Rx.Subject()
) implements that grammar : whenonCompleted
has been called once, all subsequent calls toonNext
are ignored. Second call ofonCompleted
on the same observer is also ignored. If an observer subscribes to the observable side of the subject, itsonComplete
callback will immediately be called (http://jsfiddle.net/cLf6Lqsn/1/).Creation
new Rx.Subject()
Returns a subject which connects its observer to its observable (jsfiddle). This example is taken from the official documentation and portrays how to use subjects as proxies. The subject is subscribed to a source (observer side), and is also listened on by observers (observable side). Any call to
onNext
(observer side) results in the observable side callingonNext
with the same value for each of its observers.Rx.Subject.create(observer, observable)
Creates a subject from the specified observer and observable. Those two are not necessarily connected. A good example can be seen in the implementation of
Rx.Dom.fromWebSocket
which returns a subject used to send and receive data from a socket. The observer side of the subject sends data to the socket. The observable side is used to listen on incoming data from the socket. Also, a subject created this way does NOT have adispose
method.
Specialized Rx.Subject semantics
- This
reactivex.io
documentation covers pretty well most of the semantics of the specialized subjects. - The other interesting points to mention concern behavior past completion.
- Sample code illustrating the behaviour are here for async, behavior, replay
Hopefully I did not get too much wrong. I'll be happy to be corrected. Last note, this should be valid for RxJS v4.
For a detailed explanation of the behaviour of cold/hot observables, one can refer to : Hot and Cold observables : are there 'hot' and 'cold' operators?
Rxjs observable versus observer 'semantics'
Observable-Observer pattern is a Push mechanism, means that it is the mission of Observable to notify Observer. Observable needs a reference to Observer to notify it about new emitions. Observable callbacks such as onNext and onError are the bridge between Observable-Observer so such callbacks exists in Observer and Observable will call them.
Subject vs BehaviorSubject vs ReplaySubject in Angular
It really comes down to behavior and semantics. With a
Subject
- a subscriber will only get published values that were emitted after the subscription. Ask yourself, is that what you want? Does the subscriber need to know anything about previous values? If not, then you can use this, otherwise choose one of the others. For example, with component-to-component communication. Say you have a component that publishes events for other components on a button click. You can use a service with a subject to communicate.BehaviorSubject
- the last value is cached. A subscriber will get the latest value upon initial subscription. The semantics for this subject is to represent a value that changes over time. For example a logged in user. The initial user might be an anonymous user. But once a user logs in, then the new value is the authenticated user state.The
BehaviorSubject
is initialized with an initial value. This is sometimes important to coding preference. Say for instance you initialize it with anull
. Then in your subscription, you need to do a null check. Maybe OK, or maybe annoying.ReplaySubject
- it can cache up to a specified number of emissions. Any subscribers will get all the cached values upon subscription. When would you need this behavior? Honestly, I have not had any need for such behavior, except for the following case:If you initialize a
ReplaySubject
with a buffer size of1
, then it actually behaves just like aBehaviorSubject
. The last value is always cached, so it acts like a value changing over time. With this, there is no need for anull
check like in the case of theBehaviorSubject
initialized with anull
. In this instance, no value is ever emitted to the subscriber until the first publishing.
So it really comes down to the behavior you are expecting (as for which one to use). Most of the time you will probably want to use a BehaviorSubject
because what you really want to represent is that "value over time" semantic. But I personally don't see anything wrong with the substitution of ReplaySubject
initialized with 1
.
What you want to avoid is using the vanilla Subject
when what you really need is some caching behavior. Take for example you are writing a routing guard or a resolve. You fetch some data in that guard and set it in a service Subject
. Then in the routed component you subscribe to the service subject to try to get that value that was emitted in the guard. OOPs. Where's the value? It was already emitted, DUH. Use a "caching" subject!
See also:
- What are RxJS Subject's and the benefits of using them?
Understanding RxJS debounce semantics
What the debounceTime
operator does is delay the source observable emission by the specified timeframe. If another source value is emitted during the delay it:
- Drops previous value
- Resets the interval timer
- Wait to see whether the interval will pass without more source values being emitted.
- Finally if the are no values emitted during the interval it will emit the last value.
RxJSs marble test diagram illustrates this quite well. Lets review a few cases.
When we have source.debountTime(20)
,
- all source value emission occurs after 20
const source = '-a--------b------';
const expected = '---a--------b----';
all of source values are just simply delayed.
- if some source value emitted subsequently in 20ms
const source = '-a--bc--d---|';
const expected = '---a---c--d-|';
emitted a
delayed by 20ms, b
dropped since c
emitted before timespan of 20ms for next emission, then d
is delayed by 20ms after c
does.
To summarize the debounceTime
operator functions as a rate limiter only allowing a single value to be emitted in given timeframe.
Hot and Cold observables: are there 'hot' and 'cold' operators?
I am coming back a few months later to my original question and wanted to share the gained knowledge in the meanwhile.
I will use the following code as an explanation support (jsfiddle):
var ta_count = document.getElementById('ta_count');
var ta_result = document.getElementById('ta_result');
var threshold = 3;
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var messages$ = Rx.Observable.create(function (observer){
var count= 0;
setInterval(function(){
observer.onNext(++count);
}, 1000)
})
.do(emits(ta_count, 'count'))
.map(function(count){return count < threshold})
.do(emits(ta_result, 'result'))
messages$.subscribe(function(){});
As mentioned in one of the answers, defining an observable leads to a series of callback and parameter registration. The data flow has to be kicked in, and that is done via the subscribe
function.
A (simplified for illustration) detailed flow can be find thereafter.
Observables are cold by default. Subscribing to an observable will result in an upstream chain of subscriptions taking place. The last subscription leads to the execution of a function which will handle a source and emit its data to its observer.
That observer in its turn emits to the next observer, resulting in a downstream flow of data, down to the sink observer. The following simplified illustration shows subscription and data flows when two subscribers subscribe to the same observable.
Hot observables can be created either by using a subject, or through the multicast
operator (and its derivatives, see Note 3 below).
The multicast
operator under the hood makes use of a subject and returns a connectable observable. All subscriptions to the operator will be subscriptions to the inner subject. When connect
is called, the inner subject subscribes to the upstream observable, and data flows downstream.
Subjects manipulate internally a list of subscribed observers and multicast incoming data to all subscribed observers.
The following diagram summarizes the situation.
In the end, it matters more to understand the flow of data caused by the observer pattern and the implementation of the operators.
For instance, if obs
is hot, is hotOrCold = obs.op1
cold or hot? Whatever the answer is :
- if there are no subscribers to
obs.op1
, no data will flow throughop1
. If there were subscribers to hotobs
, that meansobs.op1
will have possibly lost pieces of data - supposing that
op1
is not a multicast-like operator, subscribing twice tohotOrCold
will subscribe twice toop1
, and every value fromobs
will flow twice throughop1
.
Notes :
- This information should be valid for Rxjs v4. While the version 5 has gone
through considerable changes, most of it still applies verbatim. - Unsubscription, error and completion flows are not represented, as
they are not in the scope of the question. Schedulers are also not
taken into account. Among other things, they influence the timing of
the data flow, but a priori not its direction and content. - According to the type of subject used for multicasting, there are
different derived multicasting operators:
Subject type | `Publish` Operator | `Share` operator
------------------ | --------------------------- | -----------------
Rx.Subject | Rx.Observable.publish | share
Rx.BehaviorSubject | Rx.Observable.publishValue | shareValue
Rx.AsyncSubject | Rx.Observable.publishLast | N/A
Rx.ReplaySubject | Rx.Observable.replay | shareReplay
Update : See also the following articles, here, and there) on that subject by Ben Lesh.
Further details on subjects can be found in this other SO question : What are the semantics of different RxJS subjects?
What is the purpose of next('r') in the context of an RxJS Subject
It's a good question, I definitely recommend you to read about hot and cold Observables.
- cold Observables execute each time someone
subscribe
s to it.
const a$ = of(5).pipe(tap(console.log))
a$.subscribe(); // the 'tap' will be executed here
a$.subscribe(); // and here, again.
- hot Observables do not care about subscriptions in terms of execution:
const a$ = of(5).pipe(
tap(console.log),
shareReplay(1)
);
a$.subscribe(); // the 'tap' will be executed here
a$.subscribe(); // but not here! console.logs only once
In your example you are using Subject
that represents cold Observable.
You can try to use BehaviorSubject
or ReplaySubject
- both of them are hot but be aware that they behave differently.
IN you example you can modify your Subject
like the following:
const mySubject = new Subject();
const myStream$ = mySubject.pipe(
shareReplay(1)
);
myStream$.subscribe(x => console.log(x))
mySubject.next(1);
mySubject.next(2);
mySubject.next(3);
Related Topics
JavaScript Xmlhttprequest Using JSONp
Jquery Difference Between Change and Click Event of Checkbox
Does Console.Log Invokes Tostring Method of an Object
Chrome Extension: How to Save a File on Disk
Typescript Export VS. Default Export
When Should I Use ? (Nullish Coalescing) VS || (Logical Or)
Browser Event When Downloaded File Is Saved to Disk
Which Edition of Ecma-262 Does Google Apps Script Support
Why Use Object.Prototype.Hasownproperty.Call(Myobj, Prop) Instead of Myobj.Hasownproperty(Prop)
How to Submit a Form Using Phantomjs
How to Add a Key Prop to a React Fragment
Why Define an Anonymous Function and Pass It Jquery as the Argument
Chrome Extension: Port Error: Could Not Establish Connection. Receiving End Does Not Exist
How to Do Two-Way Filtering in Angularjs
Firefox Error Rendering an Svg Image to HTML5 Canvas with Drawimage