How to 'Await' on an Rx Observable

How can I `await` on an Rx Observable?

You have to pass a promise to await. Convert the observable's next event to a promise and await that.

if (condition) {
await observable.first().toPromise();
}

Edit note: This answer originally used .take(1) but was changed to use .first() which avoids the issue of the Promise never resolving if the stream ends before a value comes through.

As of RxJS v8, toPromise will be removed. Instead, the above can be replaced with await firstValueFrom(observable)

Is it a good practice using Observable with async/await?

Chaining Observables in sequence, as you want to do in your code

Concerning your code example, if you want to chain Observables (trigger another after the previous emits), use flatMap (or switchMap) for this purpose :

this.serviceA.get()
.flatMap((res1: any) => this.serviceB.get())
.flatMap((res2: any) => this.serviceC.get())
.subscribe( (res3: any) => {
....
});

This one is better practice compared to nesting, as this will make things clearer and help you avoid callback hell, that Observable and Promises were supposed to help preventing in the first place.

Also, consider using switchMap instead of flatMap, basically it will allow to 'cancel' the other requests if the first one emits a new value. Nice to use if the first Observable that triggers the rest is some click event on a button, for instance.

If you don't need your various requests to wait in turn for each other, you can use forkJoin or zip to start them all at once, see @Dan Macak answer's for details and other insights.


Angular 'async' pipe and Observables work well together

Concerning Observables and Angular, you can perfectly use | async pipe in a Angular template instead of subscribing to the Observable in your component code, to get the value(s) emitted by this Observable


ES6 async / await and Promises instead of Observables ?

if you're not feeling using Observable directly, you can simply use .toPromise() on your Observable, and then some async/await instructions.

If your Observable is supposed to return only one result (as it is the case with basic API calls) , an Observable can be seen as quite equivalent to a Promise.

However, I'm not sure there is any need to do that, considering all the stuff that Observable already provide (to readers : enlightening counter-examples are welcome!) . I would be more in favor of using Observables whenever you can, as a training exercise.


Some interesting blog article on that (and there are plenty of others):

https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875

The toPromise function is actually a bit tricky, as it’s not
really an “operator”, rather it’s an RxJS-specific means of
subscribing to an Observable and wrap it in a promise. The promise
will resolve to the last emitted value of the Observable once the
Observable completes
. That means that if the Observable emits the
value “hi” then waits 10 seconds before it completes, the returned
promise will wait 10 seconds before resolving “hi”. If the Observable
never completes, then the Promise never resolves.

NOTE: using toPromise() is an antipattern except in cases where you’re
dealing with an API that expects a Promise, such as async-await

(emphasis mine)


The example you requested

BTW, it will be nice if anyone can give me an example code to solve
this with async/await :D

Example if you really want to do it (probably with some mistakes, can't check right now, please feel free to correct)

// Warning, probable anti-pattern below
async myFunction() {
const res1 = await this.serviceA.get().toPromise();
const res2 = await this.serviceB.get().toPromise();
const res3 = await this.serviceC.get().toPromise();
// other stuff with results
}

In the case you can start all requests simultaneously, await Promise.all() which should be more efficient, because none of the calls depends on the result of each other. (as would forkJoin do with Observables)

async myFunction() {
const promise1 = this.serviceA.get().toPromise();
const promise2 = this.serviceB.get().toPromise();
const promise3 = this.serviceC.get().toPromise();

let res = await Promise.all([promise1, promise2, promise3]);

// here you can retrieve promises results,
// in res[0], res[1], res[2] respectively.
}

Why is it possible to await an Rx observable?

No, the compiler has no special knowledge of IObservable<T>. As per section 7.7.7.1 of the C# 5 specification if the object has a method or there is an extension method in scope named GetAwaiter that returns a type that implements System.Runtime.CompilerServices.INotifyCompletion, it can be awaited. See Steven Toub's article, Await anything.

More specifically, from the spec

The task of an await expression is required to be awaitable. An expression t is awaitable if one of the following holds:

- t is of compile time type dynamic

- t has an accessible instance or extension method called GetAwaiter with no parameters and no type parameters, and a return type A for which all of the following hold:

1. A implements the interface System.Runtime.CompilerServices.INotifyCompletion (hereafter known as INotifyCompletion for brevity)

2. A has an accessible, readable instance property IsCompleted of type bool

3. A has an accessible instance method GetResult with no parameters and no type parameters

Note how this is similar to how foreach does not require IEnumerable<T> but simply a GetEnumerator method that returns a compatible object. This sort of duck typing is a performance optimization that allows value types to be used by the compiler without boxing. This can be used to avoid unnecessary allocations in performance sensitive code.

How to await an asynchonous method defined inside subscribe before to complete an Rxjs Observable?

I would either a) stick to the observable recipe, and convert promises to observables, or b) stick to the promise/async-await pattern, and convert observables to promises. I frankly have no idea how to successfully mix those two.

Rx-based solution:

import { from } from 'rxjs';
import { finalize } from 'rxjs/operators';

myObservable.pipe(
switchMap(() => from(something()),
finalize(() => console.log('All async task are comlpeted')),
).subscribe(someFunction);

in rx.js make source.subscribe await it's observer using async/await

So ultimately what I have found is that what I need is more accurately described as a fifo queue or buffer. I need the messages to wait until the previous message is done processing.

I have also pretty certain that rxjs doesn't offer this ( sometimes referred to as backpressure ). So what I have done is to just import a fifo queue and hook it to each subscriber.

I am using concurrent-queue which so far seems to be working pretty well.

Subscribing to observable sequence with async function

Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.

Instead, consider your async method to be a single value observable sequence that takes a value from another sequence.
Now you can compose sequences, which is what Rx was designed to do.

Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.

The break down of his code is as follows.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();

In this scenario, you are creating implicit queues.
In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting.
Personally I prefer to make this explicit by putting data into a queue.
Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.

This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers.
There are many reasons that the guidance is to not put them in your subscriber, for example:
1. you break the error model
2. you are mixing async models (rx here, task there)
3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.

UPDATE

To illustrate the comment about breaking the error model here is an update of the OP sample.

void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));

Thread.Sleep(10000);
d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}

Here we can see that if the OnNext handler was to throw, then we are not protected by our Rx OnError handler.
The exception would be unhandled and most likely bring down the application.

rxjs use async/await in map Rx.Observable.range(1, 5).map

It's a slightly old question. You might have got the answer already, but I thought it's a nice challenge, which turned out to be a surprisingly simple one too!

Just switch from map() to concatMap().

And you won't even need the async / await bit inside the mapping function. Although keeping or removing it doesn't seem to change anything for me.

So, the relevant bit can be simplified to:

async function create() {
var list = await Rx.Observable.range(1, 5)
.concatMap(num => getData(num))
.toArray().toPromise();

return list;
}

And again, you can keep it exactly as you had it in the question, and just change map to concatMap. I just wanted to see how far this can go

Here's a working example

Check the console at the bottom of the page for the results.

How to await inside RxJS subscribe method

You do not need to use await, nor need to convert your Promise to an Observable.


CF this Tweet from Ben Lesh :

Sample Image


Here's an example with a mock for the function saveToDatabase :

(and the working Plunkr : https://plnkr.co/edit/7SDLvRS2aTw9gYWdIznS?p=preview)

const { Observable } = Rx;

const saveToDatabase = (date) =>
new Promise(resolve =>
setTimeout(() =>
resolve(`${date} has been saved to the database`),
1000));

const date$ = Observable.of(new Date()).delay(1000);

date$
.do(x => console.log(`date received, trying to save it to database ...`))
.switchMap(date => saveToDatabase(date))
.do(console.log)
.subscribe();

Output :

Sample Image



Related Topics



Leave a reply



Submit