Retrofit and Rxjava: How to Combine Two Requests and Get Access to Both Results

Retrofit and RxJava: How to combine two requests and get access to both results?

As I understand - you need to make a request based on result of another request and combine both results. For that purpose you can use this flatMap operator variant: Observable.flatMap(Func1 collectionSelector, Func2 resultSelector)

Returns an Observable that emits the results of a specified function to the pair of values emitted by the source Observable and a specified collection Observable.Sample Image

Simple example to point you how to rewrite your code:

private Observable<String> makeRequestToServiceA() {
return Observable.just("serviceA response"); //some network call
}

private Observable<String> makeRequestToServiceB(String serviceAResponse) {
return Observable.just("serviceB response"); //some network call based on response from ServiceA
}

private void doTheJob() {
makeRequestToServiceA()
.flatMap(new Func1<String, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(String responseFromServiceA) {
//make second request based on response from ServiceA
return makeRequestToServiceB(responseFromServiceA);
}
}, new Func2<String, String, Observable<String>>() {
@Override
public Observable<String> call(String responseFromServiceA, String responseFromServiceB) {
//combine results
return Observable.just("here is combined result!");
}
})
//apply schedulers, subscribe etc
}

Using lambdas:

private void doTheJob() {
makeRequestToServiceA()
.flatMap(responseFromServiceA -> makeRequestToServiceB(responseFromServiceA),
(responseFromServiceA, responseFromServiceB) -> Observable.just("here is combined result!"))
//...
}

RxJava Combine Sequence Of Requests

tl;dr use concatMapEager or flatMap and execute sub-calls asynchronously or on a schedulers.


long story

I'm not an android developer, so my question will be limited to pure RxJava (version 1 and version 2).

If I get the picture right the needed flow is :

some query param 
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param

Assuming Retrofit generated the clients for

interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

If the order of the item is not important, then it is possible to use flatMap only:

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)

But only if the retrofit builder is configured with

  • Either with the async adapter (calls will be queued in the okhttp internal executor). I personally think this is not a good idea, because you don't have control over this executor.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
  • Or with the scheduler based adapter (calls will be scheduled on the RxJava scheduler). It would my preferred option, because you explicitly choose which scheduler is used, it will be most likely the IO scheduler, but you are free to try a different one.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))

The reason is that flatMap will subscribe to each observable created by api2.extendedInfo(...) and merge them in the resulting observable. So results will appear in the order they are received.

If the retrofit client is not set to be async or set to run on a scheduler, it is possible to set one :

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)

This structure is almost identical to the previous one execpts it indicates locally on which scheduler each api2.extendedInfo is supposed to run.

It is possible to tune the maxConcurrency parameter of flatMap to control how many request you want to perform at the same time. Although I'd be cautious on this one, you don't want run all queries at the same time. Usually the default maxConcurrency is good enough (128).

Now if order of the original query matter. concatMap is usually the operator that does the same thing as flatMap in order but sequentially, which turns out to be slow if the code need to wait for all sub-queries to be performed. The solution though is one step further with concatMapEager, this one will subscribe to observable in order, and buffer the results as needed.

Assuming retrofit clients are async or ran on a specific scheduler :

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)

Or if the scheduler has to be set locally :

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)

It is also possible to tune the concurrency in this operator.


Additionally if the Api is returning Flowable, it is possible to use .parallel that is still in beta at this time in RxJava 2.1.7. But then results are not in order and I don't know a way (yet?) to order them without sorting after.

api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>

retrofit2 and rxjava - how to combine two dependent network calls in android

I'm not sure if I understand correctly but from first call you receive list of Business and than for each of those Business you need to get review. If so, than you can use following code:

ApiService service = Retrofit.create(ApiService.class);
service.getBusinessesRx("delis", "37.786882", "-122.399972")
.map(Businesses::getBusinesses)
.flatMap(Observable::from)
.map(Business::getId)
.flatMap(service::getReviewsRx)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(businesses -> updateUi());

RxJava - Combine 2 calls

You could utilize this flatMap overload:

Api.getCars()
.flatMap(cars -> Observable.from(cars)) // flatten your list
.flatmap(car -> Api.getOwner(car), // request each owner
(car, owner) -> {
car.setOwner(owner); // assign owner to the car
return Observable.just(car);
})
.toList() // collect items into List
... // here is your Observable<List<Car>>

Multiple requests with retrofit to combine results

You can use RxJava and combine results like this: Combining API calls with RX Java

Here is tutorial, how to use RxJava and Retrofit 1.9: http://randomdotnext.com/retrofit-rxjava/

rxJava how to make sequential call while being able to acess previous parameter

If you don't need a result then you can ignoreElement() to convert your flow into Completable and use toSingleDefault(...) function:

uploadData(apiResponse.getUrl())
.ignoreElement()
.toSingleDefault(new FinalResult());

In case you just need to convert the response into FinalResult then you can use map(...):

uploadData(apiResponse.getUrl())
.map(uploadResponse -> new FinalResult(uploadResponse));

In case you have to utilize the result from uploadData(..) with any external calls or whatever then flatMap() is your choice:

uploadData(apiResponse.getUrl())
.flatMap(uploadResponse -> {
// do whatever you want with response
return Single.just(new FinalResult(uploadResponse));
});

UPDATE:

In your case it can be simplified:

return processData(fileData)
.flatMap(processedData -> {
Single<FinalResult> postProcessing;
if (processedData.length > LIMIT) {
postProcessing = uploadData(apiResponse.getUrl())
.map(response -> new FinalResult(response));
} else {
postProcessing = Single.just(new FinalResult());
}
return callAPI(id, processedData)
.ignoreElement()
.andThen(postProcessing);

});


Related Topics



Leave a reply



Submit