When Should One Use Rxjava Observable and When Simple Callback on Android

When should one use RxJava Observable and when simple Callback on Android?

For simple networking stuff, the advantages of RxJava over Callback is very limited. The simple getUserPhoto example:

RxJava:

api.getUserPhoto(photoId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Photo>() {
@Override
public void call(Photo photo) {
// do some stuff with your photo
}
});

Callback:

api.getUserPhoto(photoId, new Callback<Photo>() {
@Override
public void onSuccess(Photo photo, Response response) {
}
});

The RxJava variant is not much better than the Callback variant. For now, let's ignore the error handling.
Let's take a list of photos:

RxJava:

api.getUserPhotos(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<List<Photo>, Observable<Photo>>() {
@Override
public Observable<Photo> call(List<Photo> photos) {
return Observable.from(photos);
}
})
.filter(new Func1<Photo, Boolean>() {
@Override
public Boolean call(Photo photo) {
return photo.isPNG();
}
})
.subscribe(
new Action1<Photo>() {
@Override
public void call(Photo photo) {
list.add(photo)
}
});

Callback:

api.getUserPhotos(userId, new Callback<List<Photo>>() {
@Override
public void onSuccess(List<Photo> photos, Response response) {
List<Photo> filteredPhotos = new ArrayList<Photo>();
for(Photo photo: photos) {
if(photo.isPNG()) {
filteredList.add(photo);
}
}
}
});

Now, the RxJava variant still isn't smaller, although with Lambdas it would be getter closer to the Callback variant.
Furthermore, if you have access to the JSON feed, it would be kind of weird to retrieve all photos when you're only displaying the PNGs. Just adjust the feed to it only displays PNGs.

First conclusion

It doesn't make your codebase smaller when you're loading a simple JSON that you prepared to be in the right format.

Now, let's make things a bit more interesting. Let's say you not only want to retrieve the userPhoto, but you have an Instagram-clone, and you want to retrieve 2 JSONs:
1. getUserDetails()
2. getUserPhotos()

You want to load these two JSONs in parallel, and when both are loaded, the page should be displayed.
The callback variant will become a bit more difficult: you have to create 2 callbacks, store the data in the activity, and if all the data is loaded, display the page:

Callback:

api.getUserDetails(userId, new Callback<UserDetails>() {
@Override
public void onSuccess(UserDetails details, Response response) {
this.details = details;
if(this.photos != null) {
displayPage();
}
}
});

api.getUserPhotos(userId, new Callback<List<Photo>>() {
@Override
public void onSuccess(List<Photo> photos, Response response) {
this.photos = photos;
if(this.details != null) {
displayPage();
}
}
});

RxJava:

private class Combined {
UserDetails details;
List<Photo> photos;
}

Observable.zip(api.getUserDetails(userId), api.getUserPhotos(userId), new Func2<UserDetails, List<Photo>, Combined>() {
@Override
public Combined call(UserDetails details, List<Photo> photos) {
Combined r = new Combined();
r.details = details;
r.photos = photos;
return r;
}
}).subscribe(new Action1<Combined>() {
@Override
public void call(Combined combined) {
}
});

We are getting somewhere! The code of RxJava is now as big as the callback option. The RxJava code is more robust;
Think of what would happen if we needed a third JSON to be loaded (like the latest Videos)? The RxJava would only need a tiny adjustment, while the Callback variant needs to be adjusted in multiple places (on each callback we need to check if all data is retrieved).

Another example; we want to create an autocomplete field, which loads data using Retrofit.
We don't want to do a webcall every time an EditText has a TextChangedEvent. When typing fast, only the last element should trigger the call.
On RxJava we can use the debounce operator:

inputObservable.debounce(1, TimeUnit.SECONDS).subscribe(new Action1<String>() {
@Override
public void call(String s) {
// use Retrofit to create autocompletedata
}
});

I won't create the Callback variant but you will understand this is much more work.

Conclusion:
RxJava is exceptionally good when data is sent as a stream. The Retrofit Observable pushes all elements on the stream at the same time.
This isn't particularly useful in itself compared to Callback. But when there are multiple elements pushed on the stream and different times, and you need to do timing-related stuff, RxJava makes the code a lot more maintainable.

Replace callbacks with observables from RxJava

For example you can use Observable.fromCallable to create observable with your data.

public Observable<Data> getData(){
return Observable.fromCallable(() -> {
Data result = null;
//do something, get your Data object
return result;
});
}

then use your data

 getData().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
//do something with your data
}, error -> {
//do something on error
});

Used rxjava 1.x and lambda expressions.

edit:

if I understand you well, you wanted to replace that listener, not wrap it into observable. I added other example in reference to your comment. Oh.. also you should use Single if you are expecting only one item.

public Single<Data> getData() {
return Single.create(singleSubscriber -> {
Data result = object.getData();
if(result == null){
singleSubscriber.onError(new Exception("no data"));
} else {
singleSubscriber.onSuccess(result);
}
});
}

getData().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
//do something with your data
}, error -> {
//do something on error
});

RxJava Android wait for callback to finish before returning data

It looks like the Cognito SDK already provides an async way to get information. In order for you to wrap this into an rx stream, you should consider using a Subject.

Subject are both Observables capable of emitting data, and Observers capable of receiving data. A Subject can wait to receive the callback data, take the data, and then emit it onto a stream.

public Observable<CognitoUser> SignUp(CreateParams createParams){
BehaviorSubject<CognitoUser> subject = BehaviorSubject.create();

// ...

SignUpHandler signupCallback = new SignUpHandler() {

@Override
public void onSuccess(CognitoUser cognitoUser, boolean userConfirmed, CognitoUserCodeDeliveryDetails cognitoUserCodeDeliveryDetails) {
// Sign-up was successful

// Check if this user (cognitoUser) needs to be confirmed
if(!userConfirmed) {
// This user must be confirmed and a confirmation code was sent to the user
// cognitoUserCodeDeliveryDetails will indicate where the confirmation code was sent
// Get the confirmation code from user
Timber.d("Sent confirmation code");
}
else {
// The user has already been confirmed
Timber.d("User has already been confirmed.");
}

subject.onNext(cognitoUser);
subject.onComplete();
}

@Override
public void onFailure(Exception exception) {
subject.onError(exception);
}
};

userPool.signUpInBackground(userId, password, userAttributes, null, signupCallback);
return subject;
}

Do an action before any subscriber callback of an Observable is called in RxJava

To answer my own question: the piece of code was a bit complex and not mine, so I didn't understand it well when asking the question.

Basically the code was like below; the observable emits 1 or 2 values.
The code relied on onNext callbacks of subscriber. Most of the time (surprisingly) it worked fine, i.e. onNext would see the value of the boolean as the original author wanted it to be, but sometimes the onAfterTerminate would executed only after all onNext callbacks.

     private Observable<...> getData() {
return dataStore.getData()
.doOnNext(...)
.map(...)
.doOnSubscribe(() -> myBoolean = true)
.doAfterTerminate(() -> myBoolean = false)
.map(...)
}

Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean);

The main fix was to add onCompleted callback to the subscription and work with the boolean there:

    Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean,
onCompletedDoSomethingWithTheBoolean);

Apart from that I also:

  • changed from boolean to AtomicBoolean
  • changed doAfterTerminate to doOnTerminate

Anyway this piece of code can probably be rewritten to avoid the mutable state at all.

RxJava - When and why to use Observable.share()

There are two kinds of Observables: cold and hot. Cold Observables start producing items when there is an Observer to them and they do it on an individual basis. Subscribing with multiple Observers will result in multiple runs of the same cold Observable. An example of this is an Observable that issues a network request.

In contrast, a hot Observable can produce items with or without the presence of Observers. These Observables usually multicast the same items to all of their current Observers. An example of this is an Observable of button clicks.

To turn a cold Observable into a hot one, you can use publish() which will make sure only a single subscription is established to the source Observable no matter how many Observers there are. Thus, the chain will now act as a hot multicasting Observable from the Observers' perspective.

However, often it is not economic to keep an originally cold Observable running after all Observers of publish() have gone away, therefore the refCount() operator can be used to keep track of them and stop the source when there are no interesting parties left.

If your source is already hot, you don't need share().

Why is not share() the default behavior for all observables?

Actually, this property is one of the important contributions of the modern reactive programming paradigm: the distinction of the hot and cold Observables.

However, multicasting is expensive on itself because you have to keep track of the set of current Observers in order to notify them with new events, and this can lead to all sorts of logical race conditions to be defended against. A cold Observable is known to talk only to a single Observer so there is no need for the extra tracking overhead.

Rxjava newbee question around chosing an observable

If you don't care about the exception at all, you can use merge to observe both and onErrorComplete each to ignore the error:

observeA = Observable A
observeB = Observable B

Observable.merge(
observeA.onErrorComplete(),
observeB.onErrorComplete()
).subscribeOn(blah)


Related Topics



Leave a reply



Submit