Single Observable with Multiple Subscribers

Multiple subscriptions to Observable

To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.

BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x)) is executed, observer is (x) => console.log('b: ' + x)), so observer.next will give you b: TEST.

So basically it is your observable creation which is wrong. In create you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.

Something like:

const notificationArrayStream = Rx.Observable.create(function (obs) {
mySubject.subscribe(obs);
return () => {}
})

function trigger(something) {
mySubject.next(something)
}

Multiple subscriptions to same Observable

There is nothing wrong with subscribe multiple times on a share observable. However having say that, often observable can be split up with sense to be reused and there is no fix rule on the granularity of each observable, each observable should be defined to fit your application requirement

for example we have a get blog post button if you want multiple component to listen to that event you can just do

const onGetPostClick=fromEvent(button,'click')

then you want to execute get post http call and also allow other component to listen to such event.

const onGetPost=onGetPostClick.pipe(mergeMap(e=>fetch()....),share())

if you only interested in post in news category

const onGetNews=onGetPost.pipe(filter(post=>post.cat==='news'))

give meaning to each observable you will find your code much more DRY and minimal

Single Observable with Multiple Subscribers

You seem to be (implicitly) casting your ConnectedObservable returned by .share() back into a normal Observable. You might want to read up on the difference between hot and cold observables.

Try

ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();

Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});

Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});

testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();

Edit: You don't need to call connect() every time you want a new subscription you only need it to start up the observable. I suppose you could use replay() to make sure all subsequent subscribers get all items produced

ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share()
.replay()

Knockout single observable with multiple subscriptions

The observer/observable design pattern does allow for multiple observers/subscriptions. The purpose of the design pattern in short is:

  1. De-couple the change from the effects of the change.
  2. Allow for any and arbitrary effects to stem from a change.

So, Knockout does that through its observables.

var observable = ko.observable("a");
observable.subscribe(function(newValue) { console.log("observer 1", newValue)});
observable.subscribe(function(newValue) { console.log("observer 2", newValue)});
observable.subscribe(function(newValue) { console.log("observer 3", newValue)});
observable("b");observable("c");
<script src="https://cdnjs.cloudflare.com/ajax/libs/knockout/3.4.2/knockout-min.js"></script>

RxJava, one observable multiple subscribers: publish().autoConnect()

This is because in fact those are two separate observables. They are "spawned" when you invoke subscribe(). Therefore the steps you have provided are incorrect in sense that step 3 & 4 are just 1 & 2 but on a different observable.

But you see them as 1 1 1 2 2 2 because of thread on which the logging happens. If you were to remove the observeOn() part then you would see emissions in a interwoven manner. To see this run code below:

@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);

dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));

Thread.sleep(1000);
}

Output, at least in my run was(notice thread names):

1  RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580

and if you apply the observeOn() it becomes:

1  RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889

As you have correctly pointed out, to get what you want you need the publish().refcount() or simply share()(it is an alias) operator.

This is because the publish() creates a ConnectableObservable which does not start emitting items until told to do so via the connect() method. in which case if you do this:

@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();

dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));

Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);

}

You will notice that for the first second(the first Thread.sleep() invocation) nothing happens and just after the dataStream.connect() is called the emissions happen.

refCount() takes in a ConnectableObservable and hides from subscribers the need to call connect() by counting how many subscribers are currently subscribed. What it does is upon first subscription it calls connect() and after last unsubscription is unsubscribes from the original observable.

As to the mutual cancellation of the publish().autoConnect(), afterwards you do get an observable but it has one special property, say that the original observable does an API call over the Internet(lasting 10 second), when you use it without share() you will end up with as many parallel queries to the server as there were subscriptions over those 10 seconds. On the other hand with share() you will have only one call.

You will not see any upside of it, if an observable that is shared completes its work very fast (like just(1,2,3)).

autoConnect()/refCount() gives you an intermediate observable to which you subscribe to instead of the original observable.

If you are interested dive into this book: Reactive Programming with RxJava

Observable executing once with multiple subscriber

The behaviour you are seeing is correct. The observable returned by interval is cold. That is, no timer is created until an observer subscribes and, when one does, the timer that's created is specifically for that subscription.

The behaviour you were expecting can be effected using the share operator:

observable = Rx.Observable
.timer(0, 1000)
.map(val => doSomething(val))
.share();

The share operator reference counts subscriptions and multicasts the source observable to multiple subscribers - so there will be only a single interval/timer, shared between the two subscribers.

For more information, you might find this article useful.



Related Topics



Leave a reply



Submit