Coordinating Parallel Execution in Node.Js

Coordinating parallel execution in node.js

Nothing is truly parallel in node.js since it is single threaded. However, multiple events can be scheduled and run in a sequence you can't determine beforehand. And some things like database access are actually "parallel" in that the database queries themselves are run in separate threads but are re-integrated into the event stream when completed.

So, how do you schedule a callback on multiple event handlers? Well, this is one common technique used in animations in browser side javascript: use a variable to track the completion.

This sounds like a hack and it is, and it sounds potentially messy leaving a bunch of global variables around doing the tracking and in a lesser language it would be. But in javascript we can use closures:

function fork (async_calls, shared_callback) {
var counter = async_calls.length;
var callback = function () {
counter --;
if (counter == 0) {
shared_callback()
}
}

for (var i=0;i<async_calls.length;i++) {
async_calls[i](callback);
}
}

// usage:
fork([A,B,C],D);

In the example above we keep the code simple by assuming the async and callback functions require no arguments. You can of course modify the code to pass arguments to the async functions and have the callback function accumulate results and pass it to the shared_callback function.


Additional answer:

Actually, even as is, that fork() function can already pass arguments to the async functions using a closure:

fork([
function(callback){ A(1,2,callback) },
function(callback){ B(1,callback) },
function(callback){ C(1,2,callback) }
],D);

the only thing left to do is to accumulate the results from A,B,C and pass them on to D.


Even more additional answer:

I couldn't resist. Kept thinking about this during breakfast. Here's an implementation of fork() that accumulates results (usually passed as arguments to the callback function):

function fork (async_calls, shared_callback) {
var counter = async_calls.length;
var all_results = [];
function makeCallback (index) {
return function () {
counter --;
var results = [];
// we use the arguments object here because some callbacks
// in Node pass in multiple arguments as result.
for (var i=0;i<arguments.length;i++) {
results.push(arguments[i]);
}
all_results[index] = results;
if (counter == 0) {
shared_callback(all_results);
}
}
}

for (var i=0;i<async_calls.length;i++) {
async_calls[i](makeCallback(i));
}
}

That was easy enough. This makes fork() fairly general purpose and can be used to synchronize multiple non-homogeneous events.

Example usage in Node.js:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
file1data = result[0][1];
file2data = result[1][1];
file3data = result[2][1];

// process the files together here
}

fork([A,B,C],D);

Update

This code was written before the existence of libraries like async.js or the various promise based libraries. I'd like to believe that async.js was inspired by this but I don't have any proof of it. Anyway.. if you're thinking of doing this today take a look at async.js or promises. Just consider the answer above a good explanation/illustration of how things like async.parallel work.

For completeness sake the following is how you'd do it with async.parallel:

var async = require('async');

async.parallel([A,B,C],D);

Note that async.parallel works exactly the same as the fork function we implemented above. The main difference is it passes an error as the first argument to D and the callback as the second argument as per node.js convention.

Using promises, we'd write it as follows:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);

Parallelizing tasks in Node.js

2022 notice: this answer predates the introduction of worker threads in Node.js

How do I make this be actually parallel?

First, you won't really be running in parallel while in a single node application. A node application runs on a single thread and only one event at a time is processed by node's event loop. Even when running on a multi-core box you won't get parallelism of processing within a node application.

That said, you can get processing parallelism on multicore machine via forking the code into separate node processes or by spawning child process. This, in effect, allows you to create multiple instances of node itself and to communicate with those processes in different ways (e.g. stdout, process fork IPC mechanism). Additionally, you could choose to separate the functions (by responsibility) into their own node app/server and call it via RPC.

What is the thing done typically by async code to not block the caller (when working with NodeJS)? Is it starting a child process?

It is not starting a new process. Underneath, when async.parallel is used in node.js, it is using process.nextTick(). And nextTick() allows you to avoid blocking the caller by deferring work onto a new stack so you can interleave cpu intensive tasks, etc.

Long story short

Node doesn't make it easy "out of the box" to achieve multiprocessor concurrency. Node instead gives you a non-blocking design and an event loop that leverages a thread without sharing memory. Multiple threads cannot share data/memory, therefore locks aren't needed. Node is lock free. One node process leverages one thread, and this makes node both safe and powerful.

When you need to split work up among multiple processes then use some sort of message passing to communicate with the other processes / servers. e.g. IPC/RPC.


For more see:

Awesome answer from SO on What is Node.js... with tons of goodness.

Understanding process.nextTick()

Javascript Running two loops in parallel

only async functions* runs async** (without workers)

async function loop1(){
for (var i = 10000; i < 20000; i++) {
await console.log(i);
}
}
async function loop2(){
for (var i = 0; i < 10000; i++) {
await console.log(i);
}
}
loop1();
loop2();

node.js : Currency limited parallel execution with time interval (rate limiting)

You're invoking the function. You meant to pass a function, but you're passing the return value of that function because you accidentally called it inline. Did you mean:

setTimeout(function(){ sendBatchReq(docs, fromIndex + batchSize, batchSize);}, 
timeout);

Async parallel requests are running sequentially

As you have discovered, async.parallel() can only parallelize operations that are themselves asynchronous. If the operations are synchronous, then because of the single threaded nature of node.js, the operations will run one after another, not in parallel. But, if the operations are themselves asynchronous, then async.parallel() (or other async methods) will start them all at once and coordinate the results for you.

Here's a general idea using async.map(). I used async.map() because the idea there is that it takes an array as input and produces an array of results in the same order as the original, but runs all the requests in parallel which seems to line up with what you want:

var async = require("async");
var request = require("request");

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
var index = Math.round(Math.random() * 495) + 1;
var url = 'http://localhost:3001/generate?file=' + index;
lookup_list.push(url);
}

async.map(lookup_list, function(url, callback) {
// iterator function
request(url, function (error, response, body) {
if (!error && response.statusCode == 200) {
var body = JSON.parse(body);
// do any further processing of the data here
callback(null, body);
} else {
callback(error || response.statusCode);
}
});
}, function(err, results) {
// completion function
if (!err) {
// process all results in the array here
console.log(results);
for (var i = 0; i < results.length; i++) {
// do something with results[i]
}
} else {
// handle error here
}
});

And, here's a version using Bluebird promises and somewhat similarly using Promise.map() to iterate the initial array:

var Promise = require("bluebird");
var request = Promise.promisifyAll(require("request"), {multiArgs: true});

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
var index = Math.round(Math.random() * 495) + 1;
var url = 'http://localhost:3001/generate?file=' + index;
lookup_list.push(url);
}

Promise.map(lookup_list, function(url) {
return request.getAsync(url).spread(function(response, body) {
if response.statusCode !== 200) {
throw response.statusCode;
}
return JSON.parse(body);
});
}).then(function(results) {
console.log(results);
for (var i = 0; i < results.length; i++) {
// process results[i] here
}
}, function(err) {
// process error here
});

Running parallel execution of fuctions using Promise all

I think it can be more readable to extract the inner function and then chain them together:

Promise.all(firstPromises)
.then(transformToSecondPromises)
.then(Promise.all)
.then(values => {/* do later operation */})
.catch(error => { debug(`Error in promises ${error}`) })

function transformToSecondPromises ([all, inventory, nearby]) {
const fileKey1 = folderName + '/' + all.QueryExecutionId + '.csv';
const fileName1 = all.QueryExecutionId + '.csv';
const fileKey2 = folderName + '/' + inventory.QueryExecutionId + '.csv';
const fileName2 = inventory.QueryExecutionId + '.csv';
const fileKey3 = folderName + '/' + nearby.QueryExecutionId + '.csv';
const fileName3 = nearby.QueryExecutionId + '.csv';
return [
s3Service.s3StreamDownload(bucketName, fileKey1, fileName1),
s3Service.s3StreamDownload(bucketName, fileKey2, fileName2),
s3Service.s3StreamDownload(bucketName, fileKey3, fileName3)
];
}

Node.js async, but only handle first positive/defined result

Use Promise.any([ap, bp]).

The following is a possible way to do it without promises. It is untested but should meet the requirements.

To meet requirement of returning the first success and not just the first completion, I keep a count of the number of completions expected so that if an error occurs it can be ignored it unless it is the last error.

function asyncMinimum(a, cb) {
var triggered = false;
var completions = a.length;
function callback(err, data) {
completions--;
if (err && completions !== 0) return;
if (triggered) return;
triggered = true;
return cb(err, data);
}
a.map(function (f) { return f(callback); });
}

asyncMinimum([

function apiRequest(cb){
request(opts,function(err,response,body){
cb(err,body.result.hit);
}
},
function redisRequest(cb){
client.get("some_key", function(err, reply) {
cb(err,reply.result.hit);
});
}],

function minimumCompleted(err,result){

// this mimimumCompleted final callback function will be only fired once,
// and would be fired by one of the above functions -
// whichever one had a value for body.result.hit that was defined

});

node.js : Currency limited parallel execution with time interval (rate limiting)

You're invoking the function. You meant to pass a function, but you're passing the return value of that function because you accidentally called it inline. Did you mean:

setTimeout(function(){ sendBatchReq(docs, fromIndex + batchSize, batchSize);}, 
timeout);


Related Topics



Leave a reply



Submit