Use cases for RxJava schedulers
Great questions, I think the documentation could do with some more detail.
io()
is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.computation()
is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say usingnewThread()
) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.- It's best to leave
computation()
for CPU intensive work only otherwise you won't get good CPU utilization. - It's bad to call
io()
for computational work for the reason discussed in 2.io()
is unbounded and if you schedule a thousand computational tasks onio()
in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.
Use cases for RxJava?
Can anyone tell me what are the use cases for RxJava in Android?
First and foremost, many places where you have a background thread in Android, you can use RxJava to replace the thread (or AsyncTask
or whatever). This includes:
- Disk I/O
- Network I/O
ContentResolver
requests of content and document providers- Database access
- And so on
Secondarily, you may be able to manage other incoming sources of data as event streams via RxJava, such as input events (see RxBinding).
in which components can I use RxJava in Android ?
Few ContentProvider
implementations will use RxJava, as they expose a synchronous API. Few BroadcastReceiver
instances will live long enough for RxJava to be useful. So, mostly, RxJava will be in support of activities and services, though the actual RxJava code may reside in other classes used by activities and services (e.g., view-models, presenters, repositories).
Retrofit with Rxjava Schedulers.newThread() vs Schedulers.io()
You are correct that the benefit of using Schedulers.io()
lies in the fact that it uses a thread pool, whereas Schedulers.newThread()
does not.
The primary reason that you should consider using thread pools is that they maintain a number of pre-created threads that are idle and waiting for work. This means that when you have work to be done, you don't need to go through the overhead of creating a thread. Once your work is done, that thread can also be re-used for future work instead of constantly creating and destroying threads.
Threads can be expensive to create, so minimizing the number of threads that you are creating on the fly is generally good.
For more information on thread pools, I recommend:
- What is the use of a Thread pool in Java?
- What is a thread pool?
- Thread pool pattern (Wikipedia)
How to test that observable uses correct schedulers in RxJava?
There is a way to indirectly access both threads on which an observable is operating and being observed, which means you can in fact verify the Observable
uses the correct schedulers.
We're confined to verifying threads by name. Fortunately, threads used by Schedulers.io()
are named with a consistent prefix that we can match against. Here's the (full?) list of schedulers that have unique prefixes for reference:
Schedulers.io()
- RxCachedThreadSchedulerSchedulers.newThread()
- RxNewThreadSchedulerSchedulers.computation()
- RxComputationThreadPool
To verify an Observable was subscribed to on the IO thread:
// Calling Thread.currentThread() inside Observer.OnSubscribe will return the
// thread the Observable is running on (Schedulers.io() in our case)
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
s.onNext(Thread.currentThread().getName());
s.onCompleted();
})
// Schedule the Observable
Usecase usecase = new Usecase(obs, Schedulers.immediate());
Observable usecaseObservable = usecase.execute();
// Verify the Observable emitted the name of the IO thread
String subscribingThread = usecaseObservable.toBlocking().first();
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");
To verify an Observable was observed on the computation thread, you can use TestSubscriber#getLastSeenThread
to access the last thread used for observing.
TestSubscriber<Object> subscriber = TestSubscriber.create();
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
usecase.execute().subscribe(subscriber);
// The observable runs asynchronously, so wait for it to complete
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
// Verify the observable was observed on the computation thread
String observingThread = subscriber.getLastSeenThread().getName();
assertThat(observingThread).startsWith("RxComputationThreadPool");
No third-party libraries or mocking are necessary, though I am using AssertJ for the fluent startsWith
assertion.
Related Topics
How to Configure Log4J with a Properties File
How to Use Selenium Webdriver on Local Webpage (On My Pc) Instead of One Located Somewhere Online
Java: What's the Difference Between Autoboxing and Casting
Javac Source and Target Options
Exclude Methods from Code Coverage with Cobertura
Convert Bufferedinputstream into Image
Fastest Way to Iterate an Array in Java: Loop Variable VS Enhanced for Statement
Java Io Ugly Try-Finally Block
How to Stop a Thread Created by Implementing Runnable Interface
Does Stream.Foreach Respect the Encounter Order of Sequential Streams
Take Full Page Screenshot in Chrome with Selenium
Convert Java.Util.Hashmap to Scala.Collection.Immutable.Map in Java
JSON and Java - Circular Reference