Observable Stack and Queue

Observable Stack and Queue

With Stacks and Queues (almost by definition) you only have access to the top of the stack or head of the queue. It's what differentiates them from a List. (and so, that's why you haven't found one)

To answer though you could write your own, I would do it by deriving from ObservableCollection, then in the case of a stack implementing the Push as an Insert at offset 0 (and pop as returning index 0 then RemoveAt index 0); or with a queue you could just Add to the end of the list to Enqueue, and the grab and remove the first item, as with the stack, for Dequeue. The Insert, Add and RemoveAt operations would be called on the underlying ObservableCollection and so cause the CollectionChanged event to be fired.


You might also be saying that you simply want to bind or be notified when the one item you are supposed to have access to changes. You would create your own class again, derived from Stack or Queue, and fire the CollectionChanged event manually when:

  • Something is pushed onto or popped from a stack
  • Something is dequeued from a queue
  • Something is queued on the queue, when the queue was previously empty

Using ObservableCollectionT as a FIFO stack

I think what you're fundamentally looking for is a queue. A queue is a First In, First Out mechanism (as opposed to a Stack, which is First In, Last Out mechanism.)

Is this a reasonable way to queue Observables?

is there a TECHNICAL reason this approach is flawed

Yes. As @frosty points out, you could run into a race condition when getItems() is executed multiple times, since you are storing data "outside of the stream", the state of this.myItem is dependent on the order in which your http requests return.

While this may work most of the time, it is not completely deterministic.

I am getting lost converting the trivial example into something that applies to me

I get it. RxJS is hard... at first :-)

One thing that helped me a lot in becoming proficient is to realize that:

  1. Observables on their own, are pretty boring
  2. RxJS makes working with them worthwhile
  • this is because there are so many operators and static functions that allow you easily create observable sources with well defined behavior.

  1. There are essentially 2 features of an observable: what and when
  • what is the shape of data it emits ?
  • when will it emit this data ?

  1. You can break observables down into smaller parts, which makes understanding and debugging much easier!

Let's take your initial example code (note: for sake of clarity for future readers I've renamed projectRev to item):

export class SomeComponent  {

public myItem : Item;
public myProject : Project;

ngOnInit() {
this.getItem();
}

getItem() {
let itemId = this.activatedRoute.snapshot.params.id;

this.itemSvc.getItemById(itemId).subscribe(
data => {
this.myItem = data;
this.getProject();
}
);
}

getProject() {
this.projectSvc.getProjectById(this.myItem.ProjectId).subscribe(
data => this.myProject = data
);
}

}

Let's design a single observable that emits exactly the data you want, exactly when you want it!

Thinking of this ahead of time makes life much easier.

For the sake of example, let's say you want to emit an Item with its parent Project attached. So,

  • what : Item object with parent Project appended
  • when : should emit whenever the source item is changed (id is different)

To accomplish this, we can define all the individual parts as separate observables. Angular provides the route params as an observable, so rather than using .snapshot which represents the state at one moment in time, let's define an itemId$ observable that will emit when the param changes:

this.itemId$ = this.activatedRoute.params.pipe(pluck('id'));

Let's also define myItem as observable. We would like myItem$ to emit the current Item (what), whenever the id route parm changes (when):

this.myItem$ = this.itemId$.pipe(
switchMap(itemId => this.itemSvc.getItemById(itemId))
);

At first, switchMap may seem confusing (I know it was for me). Here's what it does:

  • it internally subscribes to an observable source and emits its emissions
  • each time it receives a new emission, it will stop listening to the previous source and subscribe to a new source
  • in your case, we provided a function that takes the received emission from itemId$ and returns an observable. This observable is the call to this.itemSvc.getItemsById()

So, hopefully you can see that whenever itemId$ emits an id, myItem$ will emit the result of itemSvc.getItemById(), which is the Item object.

Notice, there is no subscription (this is handled internally by switchMap). Notice there is no need to stash the result in a separate local variable this.myItem, which was the cause of your possible race condition.

Next, let's define an observable that emits our Item with an additional project property (what) whenever a new Item is emitted (when):

For the sake of verbosity:

this.myItemWithProject$ = this.myItem$.pipe(
switchMap(item => this.projectSvc.getProjectById(item.ProjectId).pipe(
map(project => ({ ...item, project }))
))
);

Here we defined myItemWithProject$ as an observable that begins whenever myItem$ emits, then used our new friend switchMap to make a call to get the parent Project. We then use map to simply return a copy of the Item object with an additional project property.

Here's a StackBlitz that shows this altogether.


Maybe you don't want a single combined object, you could obviously shape the data any way you want, maybe a single ViewModel object that has item and project properties. This is actually pretty common in Angular: ​

combineLatest is a great operator to handle this for you:

public vm$ : Observable<ViewModel> = combineLatest({ 
​item : this.myItem$,
​project : this.myProject$}
);

This approach allows you to use a single observable in your template and a single async pipe to unwrap it:

<div *ngIf ="vm$ | async as vm">

<h2> My Item </h2>
<p> {{ vm.item | json }} </p>

<h2> My Project </h2>
<p> {{ vm.project | json }} </p>

</div>

As your component becomes more complex, you can simply add more sources to the vm$:

public vm$ : Observable<ViewModel> = combineLatest({ 
​item : this.myItem$,
​project : this.myProject$,
source1 : this.source1$,
source2 : this.source2$
});

StackBlitz #2

Keeping it "observable all the way" can make things really concise and tidy. But, it requires that you understand what the operators are actually doing (the what and when).

It's usually not necessary to stash data outside of the observable stream. I find that when we reach for that as a solution, it's because we don't yet fully understand all of the operators provided to us by rxjs.



Am i right in thinking i should be doing concatMap instead of switchMap

The difference between the higher order mapping operators only comes into play when they receive more than one emission. This is because they all subscribe to inner sources and emit their emissions. The difference is the strategy they use when a new emission is received before the current source completes:

  • switchMap "switches" sources to only emit from the most recent source.

  • exhaustMap will ignore new emissions until the current source completes, so it only emits from the first source.

  • mergeMap will emit from all sources old and new.

  • concatMap is really just a special case of mergeMap where it will only allow one concurrent source at a time, but will eventually emit from all sources

So, in your case, I think switchMap is appropriate, because if the id changes, you no longer care about listening to emission about object with old ids.

ObservableCollection / Queue

The complexity of Queue<T>.Dequeue is O(1) where Collection<T>.RemoveAt is O(n). This makes using a native Queue<T> the better option. You can extend Queue<T> to add collection changed notifications.

You can use the following implementation:

public class ObservableQueue<TItem> : Queue<TItem>, INotifyCollectionChanged, INotifyPropertyChanged
{
public event PropertyChangedEventHandler? PropertyChanged;
public event NotifyCollectionChangedEventHandler? CollectionChanged;

new public void Enqueue(TItem item)
{
base.Enqueue(item);
OnPropertyChanged();
OnCollectionChanged(NotifyCollectionChangedAction.Add, item, this.Count - 1);
}

new public TItem Dequeue()
{
TItem removedItem = base.Dequeue();
OnPropertyChanged();
OnCollectionChanged(NotifyCollectionChangedAction.Remove, removedItem, 0);
return removedItem;
}

new public bool TryDequeue(out TItem? result)
{
if (base.TryDequeue(out result))
{
OnPropertyChanged();
OnCollectionChanged(NotifyCollectionChangedAction.Remove, result, 0);
return true;
}
return false;
}

new public void Clear()
{
base.Clear();
OnPropertyChanged();
OnCollectionChangedReset();
}

private void OnCollectionChanged(NotifyCollectionChangedAction action, TItem item, int index)
=> this.CollectionChanged?.Invoke(this, new NotifyCollectionChangedEventArgs(action, item, index));

private void OnCollectionChangedReset()
=> this.CollectionChanged?.Invoke(this, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));

private void OnPropertyChanged() => this.PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(this.Count)));
}

RxJs - how to make observable behave like queue

What you've asked about is pretty vague and general. Without a doubt, a more constrained scenario could probably look a whole lot simpler.

Regardless, here I create a pipeline that only lets transaction(): Observable be subscribed to once at a time.

Here's how that might look:

/****
* Represents what each transaction does. Isn't concerned about
* order/timing/'transactionInProgress' or anything like that.
*
* Here is a fake transaction that just takes 3-5 seconds to emit
* the string: `Hello ${name}`
****/
function transaction(args): Observable<string> {
const name = args?.message;
const duration = 3000 + (Math.random() * 2000);
return of("Hello").pipe(
tap(_ => console.log("starting transaction")),
switchMap(v => timer(duration).pipe(
map(_ => `${v} ${name}`)
)),
tap(_ => console.log("Ending transation"))
);
}

// Track transactions
let currentTransactionId = 0;
// Start transactions
const transactionSubj = new Subject<any>();
// Perform transaction: concatMap ensures we only start a new one if
// there isn't a current transaction underway
const transaction$ = transactionSubj.pipe(
concatMap(({id, args}) => transaction(args).pipe(
map(payload => ({id, payload}))
)),
shareReplay(1)
);

/****
* Begin a new transaction, we give it an ID since transactions are
* "hot" and we don't want to return the wrong (earlier) transactions,
* just the current one started with this call.
****/
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
transactionSubj.next({id: currentId, args});
return transaction$.pipe(
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}

// Queue up 3 transactions, each one will wait for the previous
// one to complete before it will begin.
beginTransaction({message: "Dave"}).subscribe(console.log);
beginTransaction({message: "Tom"}).subscribe(console.log);
beginTransaction({message: "Tim"}).subscribe(console.log);

Asynchronous Transactions

The current setup requires transactions to be asynchronous, or you risk losing the first one. The workaround for that is not simple, so I've built an operator that subscribes, then calls a function as soon as possible after that.

Here it is:

function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
fn();
return {
unsubscribe: () => sub.unsubscribe
};
});
}

and here it is in use:

function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}

Aside: Why Use defer?

Consider re-writting beginTransaction:

function beginTransaction(args): Observable<any> {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
}

In this case, the ID is set at the moment you invoke beginTransaction.

// The ID is set here, but it won't be used until subscribed
const preppedTransaction = beginTransaction({message: "Dave"});

// 10 seconds later, that ID gets used.
setTimeout(
() => preppedTransaction.subscribe(console.log),
10000
);

If transactionSubj.next is called without the initialize operator, then this problem gets even worse as transactionSubj.next would also get called 10 seconds before the observable is subscribed to (You're sure to miss the output)

The problems continue:

What if you want to subscribe to the same observable twice?

const preppedTransaction = beginTransaction({message: "Dave"});
preppedTransaction.subscribe(
value => console.log("First Subscribe: ", value)
);
preppedTransaction.subscribe(
value => console.log("Second Subscribe: ", value)
);

I would expect the output to be:

First Subscribe: Hello Dave
Second Subscribe: Hello Dave

Instead, you get

First Subscribe: Hello Dave
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Second Subscribe: Hello Dave

Because you don't get a new ID on subscribing, the two subscriptions share one ID. defer fixes this problem by not assigning an id until subscription. This becomes seriously important when managing errors in streams (letting you re-try an observable after it errors).

Is Observables is also executed in the same way like promises in the call stack?

  1. Do Observables have a storage location, similar to how Web API requests are saved in the Web API Environment before being moved to the Call Back Queue?

No.They are held in memory by a reference to them being held by the function that triggers them. For example, if you create an observable from a DOM event, RxJS will add an event listener to the DOM that holds a reference to the observable. If you delete the DOM element being listened to, the DOM element gets collected from memory by the garbage collector, allowing the event handler to be garbage collected, allowing the observable and its internal function chain to be garbage collected, in turn allowing the subscription callback function to be garbage collected - provided no reference to the observable is separately retained in user code.


  1. Is this the original observable or a copy of that has been moved from the callback queue to the Call stack?

Based on a negative answer to question 1, this question is not based on valid assumptions.


  1. How Observables execution is different from promises in the call stack?

Observables are not executed, they are objects. Internally they record a chain of functions which is called when an external function triggers the observable by providing data to it. Of course the external function may be a timer call back (think Scheduler) that repeats, or an event that may or not repeat, or a one time promise call back.

Promises have some similarities, including the fact that they are objects sitting in memory somewhere. Internally they hold two lists of call back functions to be called if the promise is fulfilled or rejected along with resolve and reject references to the next promise in a promise chain.

Like observables, individual Promise objects are held in memory by their resolve and reject functions which are particular to a promise instance . Like observables, promises may also be held in memory by references to a Promise object held in user code.


  1. Is it necessary for the call stack to be empty in order to execute the Observables?

No. If the observable is triggered by an asynchronous task, the call stack may be almost empty except for some code associated with providing data to the observable object and internal code responsible for running the chain of functions that run before calling the subscriber call back function. If the observable is triggered synchronously who knows what's on the call stack.

5 Is it possible to run normal function calls alongside Observables (i.e., it keeps fetching data from the live environment while the rest of the functions are run separately)?

Yes, except that if the observable is triggered synchronously from user code, it won't return to the user code until all subscriber callbacks have returned.

6 What happens if we don't unsubscribe the Observables and they continue to operate in the Call stack until the app is closed?

Observables aren't in the call stack and aren't held in memory by the subscription call back function. They get called from data source events or functions and create a stack frame in the call stack for the duration of the call.

If the data source doesn't release its reference to an observable, the observable object just sits in memory without doing anything. If the data source referencing the observable ceases to be held in memory, and no reference to the observable is held in user code, the observable becomes eligible for garbage collection from memory. The subscription callback function will then also become eligible for collection from memory if user code holds no reference to it (e.g. if it is an inline anonymous function).

Note that observables are supported by a JavaScript library. You are not receiving observables from general data base or fetch or HTTP APIs directly, you are receiving the observable from some intermediary software layer that is wrapping responses to requests made for you into a data source that triggers an observable returned to you.


My apologies if this answer contains inaccuracies, based as it is on a knowledge of JavaScript and the assumption that RxJS is "well behaved" enough to infer its logic from documentation.

Chain Observable Queue

Figured it out! May not be quite as elegant as the Promise chain, and I'm definitely open to suggestions to clean it up.

var trigger = undefined;
function obsQueue() {
if (!trigger || trigger.isStopped) {
trigger = new Rx.Subject();
return createObservable(trigger);
} else {
var lastTrigger = trigger;
var newTrigger = trigger = new Rx.Subject();
return lastTrigger.last().mergeMap(() => {
return createObservable(newTrigger);
});
}
}

var j = 0;
function createObservable(trigger) {
// In my use case, this creates and shows a dialog and returns an
// observable that emits and completes when an option is selected.
// We want to make sure we only create the next dialog when the previous
// one is closed.
console.log('creating');
return new Rx.Observable.of(++j).finally(() => {
trigger.next();
trigger.complete();
});
}

obsQueue().subscribe(result => {
console.log('first', result);
});
obsQueue().subscribe(result => {
console.log('second', result);
});
obsQueue().subscribe(result => {
console.log('third', result);
});
var timer = setTimeout(() => {
obsQueue().subscribe(result => {
console.log('fourth', result);
});
}, 1000);

// Output:
// creating
// first 1
// creating
// second 2
// creating
// third 3
// creating
// fourth 4

Rather than try to figure out how to chain them in order, I have each observable create its own trigger to let the next observable know when to create itself.

If all the triggers have been completed (setTimeout case, we queue up another one later), then the queue starts again.

Dynamically adding to the queue of RXJS Observables

When you do this:

public onSave(): void {
this.busy = true;
this.service.save()
.pipe(finalize(() => this.busy=false))
.subscribe(() => this.isInSync = true); <--------------
}

You can think of it as a code smell. When you call subscribe, that's where the reactive programming ends. This should not happen until the very last end of what you're trying. In this case, as you subscribe within a method that is called multiple time, you'll loose the ability to queue the requests as you'd like.

I think what you're trying to achieve can be represented with the following code (mix of mock for the HTTP call and actual stream logic):

const mockHttpRequest = (someParam: any) =>
of(`Result: ${someParam}`).pipe(delay(1000));

const change$$ = new Subject<any>();

const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));

const isSyncing$ = combineLatest([
change$$.pipe(
map((_, index) => index),
startWith(-1)
),
queue$.pipe(
map((_, index) => index),
startWith(-1)
)
]).pipe(
map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
distinctUntilChanged()
);

isSyncing$.subscribe(console.log);

A little bit of explanation on this:

const mockHttpRequest = (someParam: any) =>
of(`Result: ${someParam}`).pipe(delay(1000));

Simply mock an HTTP call with a delay

const change$$ = new Subject<any>();

The subject that you should call whenever a change happens

const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));

As its name says: This is the queue for all the HTTP calls. They'll happen one by one and won't be done in parallel.

const isSyncing$ = combineLatest([
change$$.pipe(
map((_, index) => index),
startWith(-1)
),
queue$.pipe(
map((_, index) => index),
startWith(-1)
)
]).pipe(
map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
distinctUntilChanged()
);

This observable will let you know the current status of the app: Whether it's syncing some data or not. To do this, we compare how many items have successfully gone out of the HTTP queue with how many saves attempts where made. If it's the same number we know we're in sync otherwise we're syncing.

The distinctUntilChanged is necessary because if we emit let say 2 changes really quickly and the second one we try to send it before the last one comes back, the observable isSyncing$ would emit twice false before emitting true once they're both done. With distinctUntilChanged we ensure that it emits only once false and only once true when that's the case.

Finally to test this we can do the following:

// simulate events happening when a user type at different times
// on the document to test our code
change$$.next('This is some initial text...');

setTimeout(() => {
change$$.next('This is some initial text... And now with some other edit...');
}, 1500);

setTimeout(() => {
change$$.next(
'This is some initial text... And now with some other edit... And a quick one before previous is finished processing...'
);
}, 1500);

setTimeout(() => {
change$$.next(
'This is some initial text... And now with some other edit... And a quick one before previous is finished processing... And a last one'
);
}, 4500);

and it should output

false
true
false
true
false
true
false

Note that the ones happening at the same time (1500) don't emit twice it's just that the status isSyncing$ stays true for a bit longer than the other as the 2 calls are made one after another

false
true
false
true <--- this one
false
true
false

If you want to play with this, here's a stackblitz: https://stackblitz.com/edit/rxjs-vtnk6z?devtoolsheight=60



Related Topics



Leave a reply



Submit