Handle Paging with Rxjava

Handle Paging with RxJava

You could model it recursively:

Observable<ApiResponse> getPageAndNext(int page) {
return getResults(page)
.concatMap(new Func1<ApiResponse, Observable<ApiResponse>>() {

@Override
public Observable<ApiResponse> call(ApiResponse response) {
// Terminal case.
if (response.next == null) {
return Observable.just(response);
}
return Observable.just(response)
.concatWith(getPageAndNext(response.next));
}

});
}

Then, to consume it,

getPageAndNext(0)
.concatMap(new Func1<ApiResponse, Observable<ResponseObject>>() {

@Override
public Observable<ResponseObject> call(ApiResponse response) {
return Observable.from(response.results);
}

})
.subscribe(new Action1<ResponseObject>() { /** Do something with it */ });

That should get you a stream of ResponseObject that will arrive in order, and most likely arrive in page-size chunks.

Issue with Paging3 and Rxjava in MVP

I have done it like :

class RepoPagingSource @Inject constructor(
private val repository: ApiRepository,
val context: Context ) : RxPagingSource<Int, RepositoryResponse>() {

private lateinit var sharedPref: SharedPref

override fun loadSingle(params: LoadParams<Int>): Single<LoadResult<Int, RepositoryResponse>> {
sharedPref = SharedPref(context)

var nextPageNumber = params.key
if (nextPageNumber == null) {
nextPageNumber = 1
}

return repository.getRepositories("bearer ${sharedPref.accessToken}", nextPageNumber)
.subscribeOn(Schedulers.io())
.map { response: Response<MutableList<RepositoryResponse>> -> response.body()?.let { toLoadResult(it, nextPageNumber) } }
.onErrorReturn { LoadResult.Error(it) }
}

private fun toLoadResult(
response: MutableList<RepositoryResponse>,
position:Int
): LoadResult<Int, RepositoryResponse> {

return LoadResult.Page(
response,
null,
position + 1,
COUNT_UNDEFINED,
COUNT_UNDEFINED
)

}

override fun getRefreshKey(state: PagingState<Int, RepositoryResponse>): Int? {
return null
}}

in its work for me ,also i have changed my ver of library Rx into rxjava2

How Can We Use RxJava for Paginated Web Service Calls, Where Each Page Depends on Previous Page Responses, Without Recursion?

If Web Service API is blocking or you are willing to block then the solution is easy

Observable.generate(() -> new ApiResponse(page), (s, emitter) -> {
ApiResponse r = getResults(s.next);
emitter.onNext(r);
if (r.next == null) emitter.onComplete();
return r;
});

using notation from recursive answer.

If blocking is not desirable you can use FlowableTransformers.expand from RxJava2Extensions like so

Flowable
.just(new ApiResponse(page))
.compose(FlowableTransformers.expand(r -> r.next == null ? Flowable.empty() : getResults(r.next)));

RxJava and Retrofit handle pagination with multiple flatmaps

As your mentioned in comment, your problem is that your third flatMap should be called after the second flatMap finished.
So I'd like to propose a solution like this:

    mService.loginUser(email,password)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.flatMap(tokenResponse -> {
mService.saveUserToken(tokenResponse.body());
return mService.getUserProfile(mService.getToken();
})
.flatMap(profileResponse -> {
mService.saveUserProfile(profileResponse.body());
return mService.getUserFriendsAndNext(mService.getToken(),FIRST_PAGE,TAKE_PERPAGE);
})

// do All your work with your Friends in this operator
.doOnNext(friendResponse -> mService.handleFriendPaging(friendResponse.body().getData()))

//his will return you a Completable
.ignoreElements()

//You get the signal means the upStream works are done
.andThen(mService.getConversationsAndNext(mService.getToken(),FIRST_PAGE,TAKE_PERPAGE))
.subscribe(new Observer<Response<ConvResult>>() {
@Override
public void onSubscribe(Disposable d) {
mDisposables.add(d);
}

@Override
public void onNext(Response<ConvResult> convResponse) {
mService.handleConvPaging(convResponse .body().getData());
}

@Override
public void onError(Throwable e) {
//handle error
}

@Override
public void onComplete() {
//This is somehow not necessary
mService.saveAllFriends();
}
});

So you keep one single stream and get your work done.

ignoreElements() provide the ability that you only receive the terminal events(onComplete() and onError()), andThen() will subscribe the upStream Completable and will continue to emit items from the source you put in andThen().

I tested something like

I tried something like

    Observable.just("A", "B", "C")
.flatMap(x -> Observable.error(new Throwable("Eorror")))
.doOnNext(x -> System.out.println(x))
.ignoreElements()
.andThen(Observable.just("New D"))
.subscribe(x -> System.out.println("onNext" + x),
error -> System.out.println("onError" + error),
() -> System.out.println("END"));

and it's print only

System.out: onErrorjava.lang.Throwable: Eorror

with RxJava version 2.0.9



Related Topics



Leave a reply



Submit