Multiple subscriptions to Observable
To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.
BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x))
is executed, observer
is (x) => console.log('b: ' + x))
, so observer.next
will give you b: TEST
.
So basically it is your observable creation which is wrong. In create
you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.
Something like:
const notificationArrayStream = Rx.Observable.create(function (obs) {
mySubject.subscribe(obs);
return () => {}
})
function trigger(something) {
mySubject.next(something)
}
Multiple subscriptions to same Observable
There is nothing wrong with subscribe multiple times on a share observable. However having say that, often observable can be split up with sense to be reused and there is no fix rule on the granularity of each observable, each observable should be defined to fit your application requirement
for example we have a get blog post button if you want multiple component to listen to that event you can just do
const onGetPostClick=fromEvent(button,'click')
then you want to execute get post http call and also allow other component to listen to such event.
const onGetPost=onGetPostClick.pipe(mergeMap(e=>fetch()....),share())
if you only interested in post in news category
const onGetNews=onGetPost.pipe(filter(post=>post.cat==='news'))
give meaning to each observable you will find your code much more DRY and minimal
Single Observable with Multiple Subscribers
You seem to be (implicitly) casting your ConnectedObservable
returned by .share()
back into a normal Observable
. You might want to read up on the difference between hot and cold observables.
Try
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();
Edit: You don't need to call connect()
every time you want a new subscription you only need it to start up the observable. I suppose you could use replay()
to make sure all subsequent subscribers get all items produced
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share()
.replay()
Knockout single observable with multiple subscriptions
The observer/observable design pattern does allow for multiple observers/subscriptions. The purpose of the design pattern in short is:
- De-couple the change from the effects of the change.
- Allow for any and arbitrary effects to stem from a change.
So, Knockout does that through its observables.
var observable = ko.observable("a");
observable.subscribe(function(newValue) { console.log("observer 1", newValue)});
observable.subscribe(function(newValue) { console.log("observer 2", newValue)});
observable.subscribe(function(newValue) { console.log("observer 3", newValue)});
observable("b");observable("c");
<script src="https://cdnjs.cloudflare.com/ajax/libs/knockout/3.4.2/knockout-min.js"></script>
RxJava, one observable multiple subscribers: publish().autoConnect()
This is because in fact those are two separate observables. They are "spawned" when you invoke subscribe()
. Therefore the steps you have provided are incorrect in sense that step 3 & 4 are just 1 & 2 but on a different observable.
But you see them as 1 1 1 2 2 2 because of thread on which the logging happens. If you were to remove the observeOn()
part then you would see emissions in a interwoven manner. To see this run code below:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
Output, at least in my run was(notice thread names):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
and if you apply the observeOn()
it becomes:
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
As you have correctly pointed out, to get what you want you need the publish().refcount()
or simply share()
(it is an alias) operator.
This is because the publish()
creates a ConnectableObservable
which does not start emitting items until told to do so via the connect()
method. in which case if you do this:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
You will notice that for the first second(the first Thread.sleep()
invocation) nothing happens and just after the dataStream.connect()
is called the emissions happen.
refCount()
takes in a ConnectableObservable and hides from subscribers the need to call connect()
by counting how many subscribers are currently subscribed. What it does is upon first subscription it calls connect()
and after last unsubscription is unsubscribes from the original observable.
As to the mutual cancellation of the publish().autoConnect()
, afterwards you do get an observable but it has one special property, say that the original observable does an API call over the Internet(lasting 10 second), when you use it without share()
you will end up with as many parallel queries to the server as there were subscriptions over those 10 seconds. On the other hand with share()
you will have only one call.
You will not see any upside of it, if an observable that is shared completes its work very fast (like just(1,2,3)
).
autoConnect()
/refCount()
gives you an intermediate observable to which you subscribe to instead of the original observable.
If you are interested dive into this book: Reactive Programming with RxJava
Observable executing once with multiple subscriber
The behaviour you are seeing is correct. The observable returned by interval
is cold. That is, no timer is created until an observer subscribes and, when one does, the timer that's created is specifically for that subscription.
The behaviour you were expecting can be effected using the share
operator:
observable = Rx.Observable
.timer(0, 1000)
.map(val => doSomething(val))
.share();
The share
operator reference counts subscriptions and multicasts the source observable to multiple subscribers - so there will be only a single interval/timer, shared between the two subscribers.
For more information, you might find this article useful.
Related Topics
Display Fb Profile Pic in Circular Image View in Application
Android Getorientation Azimuth Gets Polluted When Phone Is Tilted
Android Access to Remote SQL Database
Android - Way to Appear Bordered Text on the Textview
Android - What Is the Meaning of Stableids
Align Top of Image to Top of Textview
How to Prevent Multiple Toast Overlaps
Get Last Inserted Value from SQLite Database Android
Android Mediaplayer/Videoview Error (1, -2147483648)
How to Parse the CSV File in Android Application
Android Drag/Animation of Views
How to Get National Holidays of Selected Country
Post on Facebook Wall Using Facebook Android Sdk Without Opening Dialog Box
Outdated Kotlin Runtime Warning in Android Studio
Android Set Custom Font to a Paint
How Can One Pull the (Private) Data of One's Own Android App