What Is "Callback Hell" and How and Why Does Rx Solve It

What is callback hell and how and why does RX solve it?

1) What is a "callback hell" for someone who does not know javascript and node.js ?

This other question has some examples of Javascript callback hell: How to avoid long nesting of asynchronous functions in Node.js

The problem in Javascript is that the only way to "freeze" a computation and have the "rest of it" execute latter (asynchronously) is to put "the rest of it" inside a callback.

For example, say I want to run code that looks like this:

x = getData();
y = getMoreData(x);
z = getMoreData(y);
...

What happens if now I want to make the getData functions asynchronous, meaning that I get a chance to run some other code while I am waiting for them to return their values? In Javascript, the only way would be to rewrite everything that touches an async computation using continuation passing style:

getData(function(x){
getMoreData(x, function(y){
getMoreData(y, function(z){
...
});
});
});

I don't think I need to convince anyone that this version is uglier than the previous one. :-)

2) When (in what kind of settings) does the "callback hell problem" occur?

When you have lots of callback functions in your code! It gets harder to work with them the more of them you have in your code and it gets particularly bad when you need to do loops, try-catch blocks and things like that.

For example, as far as I know, in JavaScript the only way to execute a series of asynchronous functions where one is run after the previous returns is using a recursive function. You can't use a for loop.

// we would like to write the following
for(var i=0; i<10; i++){
doSomething(i);
}
blah();

Instead, we might need to end up writing:

function loop(i, onDone){
if(i >= 10){
onDone()
}else{
doSomething(i, function(){
loop(i+1, onDone);
});
}
}
loop(0, function(){
blah();
});

//ugh!

The number of questions we get here on StackOverflow asking how to do this kind of thing is a testament to how confusing it is :)

3) Why does it occur ?

It occurs because in JavaScript the only way to delay a computation so that it runs after the asynchronous call returns is to put the delayed code inside a callback function. You cannot delay code that was written in traditional synchronous style so you end up with nested callbacks everywhere.

4) Or can "callback hell" occur also in a single threaded application?

Asynchronous programming has to do with concurrency while a single-thread has to do with parallelism. The two concepts are actually not the same thing.

You can still have concurrent code in a single threaded context. In fact, JavaScript, the queen of callback hell, is single threaded.

What is the difference between concurrency and parallelism?

5) could you please also show how RX solves the "callback hell problem" on that simple example.

I don't know anything about RX in particular, but usually this problem gets solved by adding native support for asynchronous computation in the programming language. The implementations can vary and include: async, generators, coroutines, and callcc.

In Python we can implement that previous loop example with something along the lines of:

def myLoop():
for i in range(10):
doSomething(i)
yield

myGen = myLoop()

This is not the full code but the idea is that the "yield" pauses our for loop until someone calls myGen.next(). The important thing is that we could still write the code using a for loop, without needing to turn out logic "inside out" like we had to do in that recursive loop function.

Why does the callback hell works and the async/await doesnt? (in this particular case that i am testing)

Now the first await blocks the main thread while the other two keeps showing lastly. What am i doing wrong?

The reason that ameno gets logged up front is that you have that log statement before any awaits. To evaluate the line await consoleMusical("ameno"); it has to execute consoleMusical('ameno'), get its return value, and only then can it await.

The other console.logs happen after the await, so they will get queued up as microtasks to run after the rest of the currently-executing code.

In contrast, all of the console.logs of callBackHell are inside the first promise. So all of them will be queued up as microtasks.


This problem will go away if you use an asynchronous sleep rather than a synchronous one. And as a bonus, you don't lock up the browser:

function sleep(seconds) {
return new Promise(resolve => {
setTimeout(resolve, seconds * 1000);
});
}

const consoleMusical = async (letra) => {
await sleep(1);
console.log(letra);
};

async function codeBlockerAwait() {
await consoleMusical("ameno");
await consoleMusical("dorime");
await consoleMusical("latiereno");
}

console.log(1);
codeBlockerAwait();
console.log(2);
console.log(3);

Replace callback hell with observable

you could use the flatMap operator instead to chain results. Have a look here : RxJS Promise Composition (passing data). Basically chaining promises is the same as chaining flatMap. That is :

pl().then(p2).then(p3).then(console.log);

is similar to :

 Rx.Observable.just()
.flatMap(p1)
.flatMap(p2)
.flatMap(p3);

So the transition from promises to observables is simple. If you have a function which operates with a callback instead of a promise, I can think of two options :

  • Try to use Rx.Observable.fromCallback or Rx.Observable.fromNodeCallback
  • Wrap the callback in an observable of your own. Have a look here too : rx.js how to chain observables

For instance, function asyncCall (param, cb) would lead to something like :

Rx.Observable.create (function(observer){
asyncCall(param, function cb(err, data){
if (err) {observer.onError(err)}
else {
observer.onNext(x);
// The following is supposing your callback is only called once, so there is no more data
observer.onCompleted();
}
}
})

Once that's done you can use flatMap (or concatMap if order of execution matters) as shown before.

How to avoid using 'callback hell' and turn to Promises?

There are different ways to do this. I won't cover them all.

OPTION 1: AWAIT PROMISE

For this example you will want to run the code inside an async function to take advantage of the await keyword.

Update your SMTP and PM2 functions to return a Promise. Inside the promise will handle the logic and resolve("done") releases the promise so that code can move on.

module.exports = ({ host, port, username, key }) => {
return new Promise((resolve) => {
...
resolve("done");
});
}

Now you can update the execution code to take advantage of the promises:

const fs = require("fs");
const sftp = require("./_uploader/sftp");
const pm2 = require("./_uploader/pm2");

const credentials = {
host: "",
port: 123,
username: "",
key: "key",
};

const runApp = async () => {
await sftp(credentials);
await pm2(credentials);
console.log("done");
}

runApp();

OPTION 2: CHAIN PROMISES

Another way to do this is by chaining the Promises. I opt not to do this very often because it can become a mess of nested logic.

const fs = require("fs");
const sftp = require("./_uploader/sftp");
const pm2 = require("./_uploader/pm2");

const credentials = {
host: "",
port: 123,
username: "",
key: "key",
};

sftp(credentials).then(result1 => {
pm2(credentials).then(result2 => {
console.log("done");
});
});

OPTION 3: PROMISE ALL

Another option is to use Promise.all

const fs = require("fs");
const sftp = require("./_uploader/sftp");
const pm2 = require("./_uploader/pm2");

const credentials = {
host: "",
port: 123,
username: "",
key: "key",
};

Promise.all([
sftp(credentials),
pm2(credentials)
]).then(result => {
console.log("done");
});

how to escape this callback hell using RxJava

When you want to use consecutive subscriptions, you should use flatMap operator. This way your problem can be simplified to this:

subscription = provider.removeGeofences(mGeofencePendingIntent)
.flatMap(new Func1<Status, Observable<Status>>() {
@Override
public Observable<Status> call(Status status) {
//after the first subscription
Prefs.geofence.clear();
return statusObservable;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Status>() {
@Override
public void call(Status status) {
//after the second subscription
Prefs.geofence.set(...);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "onError() called with: " + "e = [" + e + "]");
}
});

How to replace 'if statement' with rx-java for avoiding callback hell?

There are loads of ways of doing this and it really depends on your use case. In general I wouldn't want to split into 2 streams, as that makes your code less readable. Also, I'm not sure what benefit you get from the flatMap call. There's nothing wrong with doing if stuff within a map call.

Here are a few options:

1 - For adding logging (a bit like your print lines), I use doOnEach()

apiResultStream
.doOnEach(next -> {
if (next) logger.info("Logging true " + next);
else logger.info(Logging false " + next);
})
.subscribe(....

2 - The work you're doing is part of your stream, and you're going to want to do more work on the stream later - use map

apiResultStream
.map(next -> {
if (next) doSomeCallWithNextWhenTrue(next);
else doSomeCallwithNextWhenFalse(next);
})
.subscribe(...

3 - If this is work you want to do at the end of the pipeline - IE after all transformational or other stream like work has completed, then do it in the subscribe call.

apiResultStream
.subscribe(next -> {
if (next) doSomeCallWithNextWhenTrue(next);
else doSomeCallwithNextWhenFalse(next);
});

The problem is - with such a simple use case, it's difficult to suggest the best option, but I appreciate that in learning Rx, working out how to do conditional statements can seem confusing. In general, I just use map or flatMap when I'm calling another method that returns an Observable and do my logic in there.

Update

Still not sure why you're splitting your streams. Unless you start getting clever with different threads, the first subscribe call is going to block the second which is probably not what you want. Also, if you don't call subscribe more than once, then you don't need the cache() call.

There's nothing wrong with using an if statement within an map / flatmap / subscribe. Especially if it makes your code more readable.

I would do the following:

apiResultStream
.flatMap(result -> {
if (result.response == true) {
return callSuccessApi(result)
}
else {
return callFailureApi(result)
})
//Do any more calls you need
.subscribe(...

So much cleaner.

I'm a bit confused by your System.out.println calls in subscribe. Is this there for debug or logging purposes? If so, just do that within the above flatMap in the if statement.

Hope this helps,

Will



Related Topics



Leave a reply



Submit