Rxjava Instead of Asynctask

Replace Async Task with RxJava

RxJava is pretty straightforward. You could write that like this:

private void addTeamInBackground(Team team) {
Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
teamDao.addTeam(team);
// RxJava does not accept null return value. Null will be treated as a failure.
// So just make it return true.
return true;
}
}) // Execute in IO thread, i.e. background thread.
.subscribeOn(Schedulers.io())
// report or post the result to main thread.
.observeOn(AndroidSchedulers.mainThread())
// execute this RxJava
.subscribe();
}

Or you can write it in Java 8 Lambda style:

private void addTeamInBackground(Team team) {
Observable.fromCallable(() -> {
teamDao.addTeam(team);
// RxJava does not accept null return value. Null will be treated as a failure.
// So just make it return true.
return true;
}) // Execute in IO thread, i.e. background thread.
.subscribeOn(Schedulers.io())
// report or post the result to main thread.
.observeOn(AndroidSchedulers.mainThread())
// execute this RxJava
.subscribe();
}

If you care about the result, you can add more callbacks into subscribe() method:

        .subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Boolean success) {
// on success. Called on main thread, as defined in .observeOn(AndroidSchedulers.mainThread())
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

RxJava instead of AsyncTask?

The full power of RxJava is visible when you use it on Java 8, preferably with a library like Retrofit. It allows you to trivially chain operations together, with full control of error handling. For example, consider the following code given id: an int that specifies the order and apiClient: a Retrofit client for the order management microservice:

apiClient
.getOrder(id)
.subscribeOn(Schedulers.io())
.flatMapIterable(Order::getLineItems)
.flatMap(lineItem ->
apiClient.getProduct(lineItem.getProductId())
.subscribeOn(Schedulers.io())
.map(product -> product.getCurrentPrice() * lineItem.getCount()),
5)
.reduce((a,b)->a+b)
.retryWhen((e, count) -> count<2 && (e instanceof RetrofitError))
.onErrorReturn(e -> -1)
.subscribe(System.out::println);

This will asynchronously calculate the total price of an order, with the following properties:

  • at most 5 requests against the API in flight at any one time (and you can tweak the IO scheduler to have a hard cap for all requests, not just for a single observable chain)
  • up to 2 retries in case of network errors
  • -1 in case of failure (an antipattern TBH, but that's an other discussion)

Also, IMO the .subscribeOn(Schedulers.io()) after each network call should be implicit - you can do that by modifying how you create the Retrofit client. Not bad for 11+2 lines of code, even if it's more backend-ish than Android-ish.

How do I replace Asynctask with RxJava Observer?

Please try restructuring your code like this:

Completable.fromAction(() -> myViewModel.insertDatabase(myRoomEntity))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> Log.d("TAG", "Populating database Success"),
throwable -> Log.d("TAG", throwable.toString()))

Considerations:

  1. If your myRoomEntity is not available before this whole construct gets subscribed, make sure you use defer http://reactivex.io/documentation/operators/defer.html
  2. Your subscribe section handlers are operating on "main", that's why you were receiving a crash.
  3. If possible, avoid unnecessary just calls

Replace AsyncTask with Rxjava for Room Database

Try:

DAO:

@Insert
Completable insertItems(List<Items> items);

Repository:

public void insertItems(List<Items> items){ 
prjDao.insertItems(items))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
}

Or better so:

Repository:

public Completable insertItems(List<Items> items){ 
return prjDao.insertItems(items))
}

And then, subscribe to the completable and handle subscribe callbacks where you actually call insertItems().

insertItems(items)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);

In my opinion, repository should only provide bridge to the DB interface, and the caller should be the one to handle the subscribe callbacks, as each caller may want to handle the callbacks differently.

UPDATE

To use rxjava with room, please check that you have all needed dependencies in your build.gradle file:

implementation "androidx.room:room-runtime:[roomVersion]"
implementation "androidx.room:room-rxjava2:[roomVersion]"
annotationProcessor "androidx.room:room-compiler:[roomVersion]"

I currently use roomVersion 2.2.5

Here is a simple working demo of room + rxjava I just created, maybe you will find differences there:

https://github.com/phamtdat/RxRoomDemo

RxJava2 that acts like AsyncTask in Android

I would do something like this:

Observable.fromArray(getPaths())
.map(path -> copyFileToExternal(path))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aInteger -> Log.i("test", "update UI"),
throwable -> ShowFailAlertDialog),
() -> ShowSuccessAlertDialog());

A good idea is usually to have a "handler" for controlling the subscription to your observer. So that, when you need to stop your background task (for example because the user left the Activity), you can use it. For this purpose you can use subscribeWith instead of subscribe, that receive as input a ResourceObserver: in this way you get a Disposable.

Disposable subscription = Observable.fromArray(getPaths())
.map(path -> copyFileToExternal(path))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new ResourceObserver<Integer>() {
@Override
public void onNext(@NonNull Integer index) {
Log.i("test", "update UI");
}
@Override
public void onError(@NonNull Throwable e) {
ShowFailAlertDialog();
}
@Override
public void onComplete() {
ShowSuccessAlertDialog();
}
});

When you need to stop the task you can just call:

subscription.dispose();

Convert AsyncTask to RxJava

Try this.

public void networkCall(final String urls) {
Observable.fromCallable(new Func0<String>() {
@Override
public String call() {
String result = "";
URL url = null;
HttpURLConnection urlConnection = null;
try {
url = new URL(urls);
urlConnection = (HttpURLConnection) url.openConnection();

InputStream inputStream = urlConnection.getInputStream();
InputStreamReader reader = new InputStreamReader(inputStream);

int data = reader.read();

while (data != -1) {
char current = (char) data;
result += current;
data = reader.read();
}

try {
String message = "";
JSONObject jsonObject = new JSONObject(result);

String weatherInfo = jsonObject.getString("weather");
Log.i("Weather content", weatherInfo);
JSONArray arr = new JSONArray(weatherInfo);

for (int i = 0; i < arr.length(); i++) {
JSONObject jsonPart = arr.getJSONObject(i);

String main = "";
String description = "";

main = jsonPart.getString("main");
description = jsonPart.getString("description");

if (main != "" && description != "") {
message += main + ": " + description + "\r\n"; //for a line break
}

}

return message;

} catch (JSONException e) {
Toast.makeText(getApplicationContext(), "Could not find weather", Toast.LENGTH_LONG).show();
}

} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String message) {
if (message != ""){
resultTextview.setText(message);
} else {
Toast.makeText(getApplicationContext(),"Could not find weather",Toast.LENGTH_LONG).show();
}
}
});
}

But, i would recommend to use Retrofit and RxJava together.

How do I replace AsyncTask with RxJava in Room methods?

fun insert(contact:Contact) : Completable

Then somewhere you call this method

contactDao.insert()
.subscribeOn(Schedulers.IO)
.observeOn(AndroidSchedulers.Main)
.subscribe{Log.d("test","insertcompleted")
//do something when insert completed
}

or just place insert into completable like this

fun insertCompletable(contact:Contact){
Completable.fromAction{contactDao.insert}
}

and subscribe to it the same way as previous



Related Topics



Leave a reply



Submit