Chaining Observables in RxJS
About promise composition vs. Rxjs, as this is a frequently asked question, you can refer to a number of previously asked questions on SO, among which :
- How to do the chain sequence in rxjs
- RxJS Promise Composition (passing data)
- RxJS sequence equvalent to promise.then()?
Basically, flatMap
is the equivalent of Promise.then
.
For your second question, do you want to replay values already emitted, or do you want to process new values as they arrive? In the first case, check the publishReplay
operator. In the second case, standard subscription is enough. However you might need to be aware of the cold. vs. hot dichotomy depending on your source (cf. Hot and Cold observables : are there 'hot' and 'cold' operators? for an illustrated explanation of the concept)
How do I chain Angular observables?
You have multiple nested subscriptions. They lead to multiple open subscriptions which may never be closed. Instead you'd need to use various RxJS operators available to restrict it to a single subscription.
And seeing you need to trigger two requests in parallel, you could also use RxJS forkJoin
function.
Refer here for a quick run down.
In short
switchMap
operator to map from one observable to anotherfilter
operator to continue the operator chain based on a conditionforkJoin
to combine and trigger multiple observables in parallel
Try the following
ngOnInit(): void {
this.subscription = timer(0, 120 * 1000).pipe(
switchMap(() => this.shopService.getPreferencesAsObservable()),
filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
switchMap(() =>
forkJoin({
slots: getInitialPendingOrders(true),
noSlots: getInitialPendingOrders(false)
})
)
).subscribe({
next: ({ slots, noSlots }) => {
// do stuff with orders from `slots` and `noSlots` responses
},
error: (error: any) => {
// handle error
}
});
}
getInitialPendingOrders(slots: boolean): Observable<any> {
return this.apiService.fetchShopOrders("status=PENDING" + slots ? "&only_slots=true" : '');
}
Update
As a rule of thumb, you should return the observable and subscribe only where it's response is required. In your case you could pipe in a switchMap
to each argument of the forkJoin
and return an observable conditionally. When you do not wish to return anything return RxJS constand EMPTY
to emit results from the forkJoin
. Note that forkJoin
would only emit when all it's source observables complete.
ngOnInit(): void {
this.subscription = timer(0, 120 * 1000).pipe(
switchMap(() => this.shopService.getPreferencesAsObservable()),
filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
switchMap(() =>
forkJoin({
slots: getInitialPendingOrders(true).pipe(
switchMap((orders: any) => {
/* do stuff with orders */
return orders.next_page ? this.getNextSlotsSetOfPendingOrders() : EMPTY;
})
),
noSlots: getInitialPendingOrders(false).pipe(
switchMap((orders: any) => {
/* do stuff with orders */
return orders.next_page ? this.getNextNoSlotsSetOfPendingOrders() : EMPTY;
})
)
})
)
).subscribe({
next: ({ slots, noSlots }) => {
// do stuff with next set of orders from `slots` and `noSlots`
},
error: (error: any) => {
// handle error
}
});
}
getInitialPendingOrders(slots: boolean): Observable<any> {
return this.apiService.fetchShopOrders("status=PENDING" + !!slots ? "&only_slots=true" : '');
}
getNextSlotsSetOfPendingOrders(): Observable<any> {
return this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl);
}
getNextNoSlotsSetOfPendingOrders(): Observable<any> {
return this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl);
}
Angular - chaining Observables and combining their results
The first thing everybody needs to know when starting out with rxjs is don't subscribe to an observable inside an observable. (I used to do this all the time too). There are operators which merge the outputs of observables that you should learn.
In this case I will use switchMap inside the pipe for the first observable to execute the second if the isSuccessful from the first result is true. I then combine the two results inside the pipe of the second request - unless there is an error. If so, catchError is used so that only the first result is returned.
firstReq = this.http.get("https://myApi.com/posts?userId=1");
secondReq = this.http.get("https://myApi.com/albums?userId=1");
this.firstReq.pipe(
switchMap((res1) => res1.isSuccessful
? this.secondReq.pipe(
map((res2) => ({ res1, res2 })),
catchError(() => of({ res1, res2: undefined }))
)
: of({ res1, res2: undefined })
),
tap(({ res1, res2 }) => this.combineAndPrintMsg(res1, res2))
);
combineAndPrintMsg(res1, res2) {
console.log(`${res1.message}${res2?.message}`);
}
The choice of switchMap was arbitrary, you should learn the difference between that and concatMap and mergeMap.
How to I effectively chain RxJS Observables
You can use forkJoin
to combine first 2 observables (they will run in parallel).
Then use switchMap
to switch to 3rd observable.
import { of, timer, forkJoin } from "rxjs";
import { switchMap, delay, map } from "rxjs/operators";
const observableA = of("resized photo").pipe(delay(300));
const observableB = of("thumbnail").pipe(delay(250));
const observableC = of("temp deleted").pipe(delay(100));
const result = forkJoin(observableA, observableB).pipe(
switchMap(([resultA, resultB]) => {
console.log(resultA, resultB);
console.log('now to observableC');
return observableC.pipe(map(resultC => [resultA, resultB, resultC]));
})
);
result.subscribe(x => console.log(x));
Check it out on stakblitz (don't forget to open the console to see the console.log
outputs).
RxJS docs also have a handy page that helps to chose right operator for the job: operator-decision-tree
How to properly chain rxjs 6 observables?
You can use switchMap for handling observables and tap for side efects handling. And you need to subscribe because it's cold observable
For error handling use catchError for all requests
this.apiService.sendPutRequest('/api/users/activate', usrObj).pipe(
catchError(err=> this.errorHandler(err)),
switchMap(() => this.apiService.sendGetRequest('/api/users/' + this.currentUserId)
.pipe(catchError(err=> this.errorHandler(err)))
),
tap(data => this.setActiveUser(data)),
switchMap(() => this.apiService.sendGetRequest('api/tasks/user/' + this.currentUserId)
.pipe(catchError(err=> this.errorHandler(err)))
),
tap(tasks => this.taskService.setCurrentUserTasks(tasks))
).subscribe()
Chain Observable RxJs in Angular
Use switchMap
from rxjs/operators to achieve it.
this.sub = funcA()
.pipe(
switchMap(valueFromA => {
// use valueFromA in here.
return funcB()
})
)
.subscribe();
Angular 8 and chaining observables
kind of hard to interpret this but something like...
this.getChosenCityId.pipe( // get id
switchMap(id => this.getCityById(id)), // fetch the city
withLatestFrom(this.getNearbyCities), // with the latest from get nearby
switchMap(([city, getNearby]) =>
getNearby // if get nearby, then get cities by location and map
? this.getCitiesByLocation(city.location).pipe(map(nearbyCities => ({city, nearbyCities})))
: of({city}) // else just return the city
)
).subscribe(result => console.log(result))
the set up would be a little different if you also want to automatically update if getNearbyCities
changes:
combineLatest(this.getChosenCityId, this.getNearbyCities).pipe( // run when either emits
switchMap(([id, getNearby]) =>
this.getCityById(id).pipe(
switchMap(city =>
getNearby
? this.getCitiesByLocation(city.location).pipe(map(nearbyCities => ({city, nearbyCities})))
: of({city})
)
)
)
).subscribe(result => console.log(result))
Related Topics
Understanding JavaScript Scope with "Var That = This"
Create a File Using JavaScript in Chrome on Client Side
What's a Good Tool to Screen-Scrape with JavaScript Support
What Is the Meaning of an Underscore in JavaScript Function Parameter
Case Insensitive Jquery Attribute Selector
JavaScript Replace with Reference to Matched Group
How to Create a Jquery Clock/Timer
CSS Media Queries and Jquery Window .Width() Do Not Match
How to Support Promises in Internet Explorer 11
Chrome Extension: Make It Run Every Page Load
Jquery Add Required to Input Fields
Copy to Clipboard in Chrome Extension
JavaScript - Get Array Element Fulfilling a Condition
Chart.Js - Drawing an Arbitrary Vertical Line
Google Chromecast Sender Error If Chromecast Extension Is Not Installed or Using Incognito
Converting 24 Hour Time to 12 Hour Time W/ Am & Pm Using JavaScript