Handle Paging with RxJava
You could model it recursively:
Observable<ApiResponse> getPageAndNext(int page) {
return getResults(page)
.concatMap(new Func1<ApiResponse, Observable<ApiResponse>>() {
@Override
public Observable<ApiResponse> call(ApiResponse response) {
// Terminal case.
if (response.next == null) {
return Observable.just(response);
}
return Observable.just(response)
.concatWith(getPageAndNext(response.next));
}
});
}
Then, to consume it,
getPageAndNext(0)
.concatMap(new Func1<ApiResponse, Observable<ResponseObject>>() {
@Override
public Observable<ResponseObject> call(ApiResponse response) {
return Observable.from(response.results);
}
})
.subscribe(new Action1<ResponseObject>() { /** Do something with it */ });
That should get you a stream of ResponseObject
that will arrive in order, and most likely arrive in page-size chunks.
Issue with Paging3 and Rxjava in MVP
I have done it like :
class RepoPagingSource @Inject constructor(
private val repository: ApiRepository,
val context: Context ) : RxPagingSource<Int, RepositoryResponse>() {
private lateinit var sharedPref: SharedPref
override fun loadSingle(params: LoadParams<Int>): Single<LoadResult<Int, RepositoryResponse>> {
sharedPref = SharedPref(context)
var nextPageNumber = params.key
if (nextPageNumber == null) {
nextPageNumber = 1
}
return repository.getRepositories("bearer ${sharedPref.accessToken}", nextPageNumber)
.subscribeOn(Schedulers.io())
.map { response: Response<MutableList<RepositoryResponse>> -> response.body()?.let { toLoadResult(it, nextPageNumber) } }
.onErrorReturn { LoadResult.Error(it) }
}
private fun toLoadResult(
response: MutableList<RepositoryResponse>,
position:Int
): LoadResult<Int, RepositoryResponse> {
return LoadResult.Page(
response,
null,
position + 1,
COUNT_UNDEFINED,
COUNT_UNDEFINED
)
}
override fun getRefreshKey(state: PagingState<Int, RepositoryResponse>): Int? {
return null
}}
in its work for me ,also i have changed my ver of library Rx into rxjava2
How Can We Use RxJava for Paginated Web Service Calls, Where Each Page Depends on Previous Page Responses, Without Recursion?
If Web Service API is blocking or you are willing to block then the solution is easy
Observable.generate(() -> new ApiResponse(page), (s, emitter) -> {
ApiResponse r = getResults(s.next);
emitter.onNext(r);
if (r.next == null) emitter.onComplete();
return r;
});
using notation from recursive answer.
If blocking is not desirable you can use FlowableTransformers.expand
from RxJava2Extensions like so
Flowable
.just(new ApiResponse(page))
.compose(FlowableTransformers.expand(r -> r.next == null ? Flowable.empty() : getResults(r.next)));
RxJava and Retrofit handle pagination with multiple flatmaps
As your mentioned in comment, your problem is that your third flatMap should be called after the second flatMap finished.
So I'd like to propose a solution like this:
mService.loginUser(email,password)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.flatMap(tokenResponse -> {
mService.saveUserToken(tokenResponse.body());
return mService.getUserProfile(mService.getToken();
})
.flatMap(profileResponse -> {
mService.saveUserProfile(profileResponse.body());
return mService.getUserFriendsAndNext(mService.getToken(),FIRST_PAGE,TAKE_PERPAGE);
})
// do All your work with your Friends in this operator
.doOnNext(friendResponse -> mService.handleFriendPaging(friendResponse.body().getData()))
//his will return you a Completable
.ignoreElements()
//You get the signal means the upStream works are done
.andThen(mService.getConversationsAndNext(mService.getToken(),FIRST_PAGE,TAKE_PERPAGE))
.subscribe(new Observer<Response<ConvResult>>() {
@Override
public void onSubscribe(Disposable d) {
mDisposables.add(d);
}
@Override
public void onNext(Response<ConvResult> convResponse) {
mService.handleConvPaging(convResponse .body().getData());
}
@Override
public void onError(Throwable e) {
//handle error
}
@Override
public void onComplete() {
//This is somehow not necessary
mService.saveAllFriends();
}
});
So you keep one single stream and get your work done.
ignoreElements()
provide the ability that you only receive the terminal events(onComplete() and onError()), andThen()
will subscribe the upStream Completable and will continue to emit items from the source you put in andThen()
.
I tested something like
I tried something like
Observable.just("A", "B", "C")
.flatMap(x -> Observable.error(new Throwable("Eorror")))
.doOnNext(x -> System.out.println(x))
.ignoreElements()
.andThen(Observable.just("New D"))
.subscribe(x -> System.out.println("onNext" + x),
error -> System.out.println("onError" + error),
() -> System.out.println("END"));
and it's print only
System.out: onErrorjava.lang.Throwable: Eorror
with RxJava version 2.0.9
Related Topics
How to Center The Camera So That Marker Is at The Bottom of Screen? (Google Map API V2 Android)
How to Set The Rounded Corner Radius of a Color Drawable Using Xml
Successful Share Intent for Android
How to Get Thumbnail for Video in My /Sdcard/Android/Data/Mypackage/Files Folder
Apply Two Different Font Styles to a Textview
Android Studio Could Not Find Any Version That Matches Com.Android.Support:Appcompat-V7:+
Items Inside Gridview Getting Repeated When Screen Scrolls
Android: Tint Using Drawablecompat
Rejecting Incoming Call in Android
Way to Protect from Lucky Patcher/Play Licensing
Impossible to Rotate The Emulator with Android 4.4
Android View Layout_Width - How to Change Programmatically
How to Implement Ssl Certificate Pinning While Using React Native
How to Clear The Android Stack of Activities
How to Force Audio Through The Speakers When Headphones Are Plugged In