How can I `await` on an Rx Observable?
You have to pass a promise to await
. Convert the observable's next event to a promise and await that.
if (condition) {
await observable.first().toPromise();
}
Edit note: This answer originally used .take(1) but was changed to use .first() which avoids the issue of the Promise never resolving if the stream ends before a value comes through.As of RxJS v8, toPromise
will be removed. Instead, the above can be replaced with await firstValueFrom(observable)
Is it a good practice using Observable with async/await?
Chaining Observables in sequence, as you want to do in your code
Concerning your code example, if you want to chain Observables (trigger another after the previous emits), use flatMap
(or switchMap
) for this purpose :
this.serviceA.get()
.flatMap((res1: any) => this.serviceB.get())
.flatMap((res2: any) => this.serviceC.get())
.subscribe( (res3: any) => {
....
});
This one is better practice compared to nesting, as this will make things clearer and help you avoid callback hell, that Observable and Promises were supposed to help preventing in the first place.Also, consider using switchMap
instead of flatMap
, basically it will allow to 'cancel' the other requests if the first one emits a new value. Nice to use if the first Observable that triggers the rest is some click event on a button, for instance.
If you don't need your various requests to wait in turn for each other, you can use forkJoin
or zip
to start them all at once, see @Dan Macak answer's for details and other insights.
Angular 'async' pipe and Observables work well together
Concerning Observables and Angular, you can perfectly use | async
pipe in a Angular template instead of subscribing to the Observable in your component code, to get the value(s) emitted by this Observable
ES6 async / await and Promises instead of Observables ?
if you're not feeling using Observable directly, you can simply use .toPromise()
on your Observable, and then some async/await instructions.
If your Observable is supposed to return only one result (as it is the case with basic API calls) , an Observable can be seen as quite equivalent to a Promise.
However, I'm not sure there is any need to do that, considering all the stuff that Observable already provide (to readers : enlightening counter-examples are welcome!) . I would be more in favor of using Observables whenever you can, as a training exercise.
Some interesting blog article on that (and there are plenty of others):
https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875
(emphasis mine)The toPromise function is actually a bit tricky, as it’s not
really an “operator”, rather it’s an RxJS-specific means of
subscribing to an Observable and wrap it in a promise. The promise
will resolve to the last emitted value of the Observable once the
Observable completes. That means that if the Observable emits the
value “hi” then waits 10 seconds before it completes, the returned
promise will wait 10 seconds before resolving “hi”. If the Observable
never completes, then the Promise never resolves.NOTE: using toPromise() is an antipattern except in cases where you’re
dealing with an API that expects a Promise, such as async-await
The example you requested
Example if you really want to do it (probably with some mistakes, can't check right now, please feel free to correct)BTW, it will be nice if anyone can give me an example code to solve
this with async/await :D
// Warning, probable anti-pattern below
async myFunction() {
const res1 = await this.serviceA.get().toPromise();
const res2 = await this.serviceB.get().toPromise();
const res3 = await this.serviceC.get().toPromise();
// other stuff with results
}
In the case you can start all requests simultaneously, await Promise.all()
which should be more efficient, because none of the calls depends on the result of each other. (as would forkJoin
do with Observables)async myFunction() {
const promise1 = this.serviceA.get().toPromise();
const promise2 = this.serviceB.get().toPromise();
const promise3 = this.serviceC.get().toPromise();
let res = await Promise.all([promise1, promise2, promise3]);
// here you can retrieve promises results,
// in res[0], res[1], res[2] respectively.
}
Why is it possible to await an Rx observable?
No, the compiler has no special knowledge of IObservable<T>
. As per section 7.7.7.1 of the C# 5 specification if the object has a method or there is an extension method in scope named GetAwaiter
that returns a type that implements System.Runtime.CompilerServices.INotifyCompletion
, it can be awaited. See Steven Toub's article, Await anything.
More specifically, from the spec
Note how this is similar to howThe task of an await expression is required to be awaitable. An expression t is awaitable if one of the following holds:
- t is of compile time type dynamic
- t has an accessible instance or extension method called GetAwaiter with no parameters and no type parameters, and a return type A for which all of the following hold:
1. A implements the interface System.Runtime.CompilerServices.INotifyCompletion (hereafter known as INotifyCompletion for brevity)
2. A has an accessible, readable instance property IsCompleted of type bool
3. A has an accessible instance method GetResult with no parameters and no type parameters
foreach
does not require IEnumerable<T>
but simply a GetEnumerator method that returns a compatible object. This sort of duck typing is a performance optimization that allows value types to be used by the compiler without boxing. This can be used to avoid unnecessary allocations in performance sensitive code. How to await an asynchonous method defined inside subscribe before to complete an Rxjs Observable?
I would either a) stick to the observable recipe, and convert promises to observables, or b) stick to the promise/async-await pattern, and convert observables to promises. I frankly have no idea how to successfully mix those two.
Rx-based solution:
import { from } from 'rxjs';
import { finalize } from 'rxjs/operators';
myObservable.pipe(
switchMap(() => from(something()),
finalize(() => console.log('All async task are comlpeted')),
).subscribe(someFunction);
in rx.js make source.subscribe await it's observer using async/await
So ultimately what I have found is that what I need is more accurately described as a fifo queue or buffer. I need the messages to wait until the previous message is done processing.
I have also pretty certain that rxjs doesn't offer this ( sometimes referred to as backpressure ). So what I have done is to just import a fifo queue and hook it to each subscriber.
I am using concurrent-queue which so far seems to be working pretty well.
Subscribing to observable sequence with async function
Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.
Instead, consider your async method to be a single value observable sequence that takes a value from another sequence.
Now you can compose sequences, which is what Rx was designed to do.
Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.
The break down of his code is as follows.
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
In this scenario, you are creating implicit queues.In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting.
Personally I prefer to make this explicit by putting data into a queue.
Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.
This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers.
There are many reasons that the guidance is to not put them in your subscriber, for example:
1. you break the error model
2. you are mixing async models (rx here, task there)
3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.
UPDATE
To illustrate the comment about breaking the error model here is an update of the OP sample.
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
Here we can see that if the OnNext
handler was to throw, then we are not protected by our Rx OnError
handler. The exception would be unhandled and most likely bring down the application.
rxjs use async/await in map Rx.Observable.range(1, 5).map
It's a slightly old question. You might have got the answer already, but I thought it's a nice challenge, which turned out to be a surprisingly simple one too!
Just switch from map()
to concatMap()
.
And you won't even need the async
/ await
bit inside the mapping function. Although keeping or removing it doesn't seem to change anything for me.
So, the relevant bit can be simplified to:
async function create() {
var list = await Rx.Observable.range(1, 5)
.concatMap(num => getData(num))
.toArray().toPromise();
return list;
}
And again, you can keep it exactly as you had it in the question, and just change map
to concatMap
. I just wanted to see how far this can goHere's a working example
Check the console at the bottom of the page for the results.
How to await inside RxJS subscribe method
You do not need to use await
, nor need to convert your Promise
to an Observable
.
CF this Tweet from Ben Lesh :
Here's an example with a mock for the function
saveToDatabase
:(and the working Plunkr : https://plnkr.co/edit/7SDLvRS2aTw9gYWdIznS?p=preview)
const { Observable } = Rx;
const saveToDatabase = (date) =>
new Promise(resolve =>
setTimeout(() =>
resolve(`${date} has been saved to the database`),
1000));
const date$ = Observable.of(new Date()).delay(1000);
date$
.do(x => console.log(`date received, trying to save it to database ...`))
.switchMap(date => saveToDatabase(date))
.do(console.log)
.subscribe();
Output :Related Topics
JavaScript Contenteditable - Set Cursor/Caret to Index
How to Execute a Function on Pressing the Enter Key in an <Input> Field
Finding "Line-Breaks" in Textarea That Is Word-Wrapping Arabic Text
Jquery Attr('Checked','Checked') Works Only Once
How to Distinguish Mouse "Click" and "Drag"
How to Pass Jsf Managed Bean Properties to a JavaScript Function
Why and When to Use Default Export Over Named Exports in Es6 Modules
Google Spreadsheet Script - How to Transpose/Rotate Multi-Dimensional Array
Throttle Amount of Promises Open at a Given Time
How to Adapt Trampolines to Continuation Passing Style
How to Send Images to Node Js with Axios
Executing Code at Page-Level from Background.Js and Returning the Value
How to Replace Last Occurrence of Characters in a String Using JavaScript
What's "This" in JavaScript Onclick