Chaining Observables in Rxjs

Chaining Observables in RxJS

About promise composition vs. Rxjs, as this is a frequently asked question, you can refer to a number of previously asked questions on SO, among which :

  • How to do the chain sequence in rxjs
  • RxJS Promise Composition (passing data)
  • RxJS sequence equvalent to promise.then()?

Basically, flatMap is the equivalent of Promise.then.

For your second question, do you want to replay values already emitted, or do you want to process new values as they arrive? In the first case, check the publishReplay operator. In the second case, standard subscription is enough. However you might need to be aware of the cold. vs. hot dichotomy depending on your source (cf. Hot and Cold observables : are there 'hot' and 'cold' operators? for an illustrated explanation of the concept)

How do I chain Angular observables?

You have multiple nested subscriptions. They lead to multiple open subscriptions which may never be closed. Instead you'd need to use various RxJS operators available to restrict it to a single subscription.

And seeing you need to trigger two requests in parallel, you could also use RxJS forkJoin function.

Refer here for a quick run down.

In short

  • switchMap operator to map from one observable to another
  • filter operator to continue the operator chain based on a condition
  • forkJoin to combine and trigger multiple observables in parallel

Try the following

ngOnInit(): void {
this.subscription = timer(0, 120 * 1000).pipe(
switchMap(() => this.shopService.getPreferencesAsObservable()),
filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
switchMap(() =>
forkJoin({
slots: getInitialPendingOrders(true),
noSlots: getInitialPendingOrders(false)
})
)
).subscribe({
next: ({ slots, noSlots }) => {
// do stuff with orders from `slots` and `noSlots` responses
},
error: (error: any) => {
// handle error
}
});
}

getInitialPendingOrders(slots: boolean): Observable<any> {
return this.apiService.fetchShopOrders("status=PENDING" + slots ? "&only_slots=true" : '');
}

Update

As a rule of thumb, you should return the observable and subscribe only where it's response is required. In your case you could pipe in a switchMap to each argument of the forkJoin and return an observable conditionally. When you do not wish to return anything return RxJS constand EMPTY to emit results from the forkJoin. Note that forkJoin would only emit when all it's source observables complete.

ngOnInit(): void {
this.subscription = timer(0, 120 * 1000).pipe(
switchMap(() => this.shopService.getPreferencesAsObservable()),
filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
switchMap(() =>
forkJoin({
slots: getInitialPendingOrders(true).pipe(
switchMap((orders: any) => {
/* do stuff with orders */
return orders.next_page ? this.getNextSlotsSetOfPendingOrders() : EMPTY;
})
),
noSlots: getInitialPendingOrders(false).pipe(
switchMap((orders: any) => {
/* do stuff with orders */
return orders.next_page ? this.getNextNoSlotsSetOfPendingOrders() : EMPTY;
})
)
})
)
).subscribe({
next: ({ slots, noSlots }) => {
// do stuff with next set of orders from `slots` and `noSlots`
},
error: (error: any) => {
// handle error
}
});
}

getInitialPendingOrders(slots: boolean): Observable<any> {
return this.apiService.fetchShopOrders("status=PENDING" + !!slots ? "&only_slots=true" : '');
}

getNextSlotsSetOfPendingOrders(): Observable<any> {
return this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl);
}

getNextNoSlotsSetOfPendingOrders(): Observable<any> {
return this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl);
}

Angular - chaining Observables and combining their results

The first thing everybody needs to know when starting out with rxjs is don't subscribe to an observable inside an observable. (I used to do this all the time too). There are operators which merge the outputs of observables that you should learn.

In this case I will use switchMap inside the pipe for the first observable to execute the second if the isSuccessful from the first result is true. I then combine the two results inside the pipe of the second request - unless there is an error. If so, catchError is used so that only the first result is returned.

firstReq = this.http.get("https://myApi.com/posts?userId=1");
secondReq = this.http.get("https://myApi.com/albums?userId=1");

this.firstReq.pipe(
switchMap((res1) => res1.isSuccessful
? this.secondReq.pipe(
map((res2) => ({ res1, res2 })),
catchError(() => of({ res1, res2: undefined }))
)
: of({ res1, res2: undefined })
),
tap(({ res1, res2 }) => this.combineAndPrintMsg(res1, res2))
);

combineAndPrintMsg(res1, res2) {
console.log(`${res1.message}${res2?.message}`);
}

The choice of switchMap was arbitrary, you should learn the difference between that and concatMap and mergeMap.

How to I effectively chain RxJS Observables

You can use forkJoin to combine first 2 observables (they will run in parallel).
Then use switchMap to switch to 3rd observable.

import { of, timer, forkJoin } from "rxjs";
import { switchMap, delay, map } from "rxjs/operators";

const observableA = of("resized photo").pipe(delay(300));
const observableB = of("thumbnail").pipe(delay(250));
const observableC = of("temp deleted").pipe(delay(100));

const result = forkJoin(observableA, observableB).pipe(
switchMap(([resultA, resultB]) => {
console.log(resultA, resultB);
console.log('now to observableC');
return observableC.pipe(map(resultC => [resultA, resultB, resultC]));
})
);
result.subscribe(x => console.log(x));

Check it out on stakblitz (don't forget to open the console to see the console.log outputs).

RxJS docs also have a handy page that helps to chose right operator for the job: operator-decision-tree

How to properly chain rxjs 6 observables?

You can use switchMap for handling observables and tap for side efects handling. And you need to subscribe because it's cold observable

For error handling use catchError for all requests

this.apiService.sendPutRequest('/api/users/activate', usrObj).pipe(
catchError(err=> this.errorHandler(err)),
switchMap(() => this.apiService.sendGetRequest('/api/users/' + this.currentUserId)
.pipe(catchError(err=> this.errorHandler(err)))
),
tap(data => this.setActiveUser(data)),
switchMap(() => this.apiService.sendGetRequest('api/tasks/user/' + this.currentUserId)
.pipe(catchError(err=> this.errorHandler(err)))
),
tap(tasks => this.taskService.setCurrentUserTasks(tasks))
).subscribe()

Chain Observable RxJs in Angular

Use switchMap from rxjs/operators to achieve it.

this.sub = funcA()
.pipe(
switchMap(valueFromA => {
// use valueFromA in here.
return funcB()
})
)
.subscribe();

Angular 8 and chaining observables

kind of hard to interpret this but something like...

this.getChosenCityId.pipe( // get id
switchMap(id => this.getCityById(id)), // fetch the city
withLatestFrom(this.getNearbyCities), // with the latest from get nearby
switchMap(([city, getNearby]) =>
getNearby // if get nearby, then get cities by location and map
? this.getCitiesByLocation(city.location).pipe(map(nearbyCities => ({city, nearbyCities})))
: of({city}) // else just return the city
)
).subscribe(result => console.log(result))

the set up would be a little different if you also want to automatically update if getNearbyCities changes:

combineLatest(this.getChosenCityId, this.getNearbyCities).pipe( // run when either emits
switchMap(([id, getNearby]) =>
this.getCityById(id).pipe(
switchMap(city =>
getNearby
? this.getCitiesByLocation(city.location).pipe(map(nearbyCities => ({city, nearbyCities})))
: of({city})
)
)
)
).subscribe(result => console.log(result))


Related Topics



Leave a reply



Submit