Use Cases for Rxjava Schedulers

Use cases for RxJava schedulers

Great questions, I think the documentation could do with some more detail.

  1. io() is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
  2. computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.
  3. It's best to leave computation() for CPU intensive work only otherwise you won't get good CPU utilization.
  4. It's bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.

Use cases for RxJava?

Can anyone tell me what are the use cases for RxJava in Android?

First and foremost, many places where you have a background thread in Android, you can use RxJava to replace the thread (or AsyncTask or whatever). This includes:

  • Disk I/O
  • Network I/O
  • ContentResolver requests of content and document providers
  • Database access
  • And so on

Secondarily, you may be able to manage other incoming sources of data as event streams via RxJava, such as input events (see RxBinding).

in which components can I use RxJava in Android ?

Few ContentProvider implementations will use RxJava, as they expose a synchronous API. Few BroadcastReceiver instances will live long enough for RxJava to be useful. So, mostly, RxJava will be in support of activities and services, though the actual RxJava code may reside in other classes used by activities and services (e.g., view-models, presenters, repositories).

Retrofit with Rxjava Schedulers.newThread() vs Schedulers.io()

You are correct that the benefit of using Schedulers.io() lies in the fact that it uses a thread pool, whereas Schedulers.newThread() does not.

The primary reason that you should consider using thread pools is that they maintain a number of pre-created threads that are idle and waiting for work. This means that when you have work to be done, you don't need to go through the overhead of creating a thread. Once your work is done, that thread can also be re-used for future work instead of constantly creating and destroying threads.

Threads can be expensive to create, so minimizing the number of threads that you are creating on the fly is generally good.

For more information on thread pools, I recommend:

  • What is the use of a Thread pool in Java?
  • What is a thread pool?
  • Thread pool pattern (Wikipedia)

How to test that observable uses correct schedulers in RxJava?

There is a way to indirectly access both threads on which an observable is operating and being observed, which means you can in fact verify the Observable uses the correct schedulers.

We're confined to verifying threads by name. Fortunately, threads used by Schedulers.io() are named with a consistent prefix that we can match against. Here's the (full?) list of schedulers that have unique prefixes for reference:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputationThreadPool

To verify an Observable was subscribed to on the IO thread:

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the
// thread the Observable is running on (Schedulers.io() in our case)
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
s.onNext(Thread.currentThread().getName());
s.onCompleted();
})

// Schedule the Observable
Usecase usecase = new Usecase(obs, Schedulers.immediate());
Observable usecaseObservable = usecase.execute();

// Verify the Observable emitted the name of the IO thread
String subscribingThread = usecaseObservable.toBlocking().first();
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");

To verify an Observable was observed on the computation thread, you can use TestSubscriber#getLastSeenThread to access the last thread used for observing.

TestSubscriber<Object> subscriber = TestSubscriber.create();
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
usecase.execute().subscribe(subscriber);

// The observable runs asynchronously, so wait for it to complete
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();

// Verify the observable was observed on the computation thread
String observingThread = subscriber.getLastSeenThread().getName();
assertThat(observingThread).startsWith("RxComputationThreadPool");

No third-party libraries or mocking are necessary, though I am using AssertJ for the fluent startsWith assertion.



Related Topics



Leave a reply



Submit