Limited Parallelism with Async/Await in Typescript/Es7

Call async/await functions in parallel

You can await on Promise.all():

await Promise.all([someCall(), anotherCall()]);

To store the results:

let [someResult, anotherResult] = await Promise.all([someCall(), anotherCall()]);

Note that Promise.all fails fast, which means that as soon as one of the promises supplied to it rejects, then the entire thing rejects.

const happy = (v, ms) => new Promise((resolve) => setTimeout(() => resolve(v), ms))
const sad = (v, ms) => new Promise((_, reject) => setTimeout(() => reject(v), ms))

Promise.all([happy('happy', 100), sad('sad', 50)])
.then(console.log).catch(console.log) // 'sad'

Wait for async function and promises in it to finish

One option would be to have a loop that goes over the images and processes one after another. To then run multiple processings in parallel, start multiple loops:

  // Goes over the "data" array, calls and waits for each "task" and processes "runnerCount" tasks in parallel
function inParallel(task, data, runnerCount) {
let i = 0, results = [];

async function runner() {
while(i < data.length) {
const pos = i++; // be aware: concurrent modification of i
const entry = data[pos];
results[pos] = await task(entry);
}
}

const runners = Array.from({ length: runnerCount }, runner);

return Promise.all(runners).then(() => results);
}

To be used as:

  const delay = ms => new Promise(res => setTimeout(res, ms));

inParallel(async time => {
console.log(`Timer for ${time}ms starts`);
await delay(time);
console.log(`Timer for ${time}ms ends`);
}, [5000, 6000, 1000]/*ms*/, 2/*in parallel*/);

Slowdown due to non-parallel awaiting of promises in async generators

You are spending too much time waiting for I/O from different sources.

In normal promise code, you'd use Promise.all for this, however - people have a tendency to write code that waits for requests with generators. Your code does the following:

<-client     service->
countryFor..
''--..
''--..
''--.. country server sends response
..--''
..--''
..--''
getCommentDataFor
''--..
''--..
''--..
''--.. comment service returns response
..--''
..--''
..--''
authenticate
''--..
''--..
''--.. authentication service returns
..--''
..--''
..--''
Generator done.

Instead, it should be doing:

<-client     service->
countryFor..
commentsFor..''--..
authenticate..''--..''--..
''--..''--..''--.. country server sends response
''--..--''.. comment service returns response
..--''..--''.. authentication service returns response
..--''..--''..
..--''..--''..--''
..--''..--''
..--''
Generator done

Simply put, all your I/O should be done in parallel here.

To fix this, I'd use Promise.props. Promise.props takes an objects and waits for all its properties to resolve (if they are promises).

Remember - generators and promises mix and match really well, you simply yield promises:

Client.prototype.fetchCommentData = async(function* (user){
var country = countryService.countryFor(user.ip);
var data = api.getCommentDataFor(user.id);
var notBanned = authServer.authenticate(user.id).then(function(val){
if(!val) throw new AuthenticationError(user.id);
});
return Promise.props({ // wait for all promises to resolve
country : country,
comments : data,
notBanned: notBanned
});
});

This is a very common mistake people make when using generators for the first time.

ascii art shamelessly taken from Q-Connection by Kris Kowal

Why the execution doesn't stop at await (async/await JS)?

The fns array is a list of functions. Those functions return Promise objects. You are then passing that list of functions to your runPromises function, which appears to want to take a list of Promise objects as an argument. But you are passing it a list of functions.

My suggestions are either

  • change the runPromises function so that it actually calls prom() at some point (or just use a map, like const r = await Promise.all(queue.map((fn) => { return fn(); }));), or

  • change the definition of fns to something like let fns = [ asyncFuncLong(), asyncFuncShort(), asyncFuncLong(), ... ];.

Given a list of async functions, how can I process them n at a time?

You can achieve it using a plugin like bluebird which have a function allowing it : example here

const Promise = require('bluebird');
const _ = require('lodash');

let tasks = [...]; // over 10,000 items.

Promise.all(
_(tasks).map(task => {
return task.doAsyncThing();
})
.value();
);

Promise.map(
tasks,
task => {
return task.doAsyncThing();
},
{ concurrency: 1000 }
);

You can also create a function yourself that's gonna handle it. Here is one function I've made up myself, ofc it's improvable.

const resultPromises = [];
let nextToCall = 0;
let errorHappened = false;
/** * Function that get called when one promise got executed */function callbackOnePromiseGotExecuted({ error, result, allExecutedCallback, array,}) { resultPromises.push(result);
// Do nothing if an error got reported if (errorHappened) { return; }
// Return the error if (error) { allExecutedCallback(null, resultPromises);
errorHappened = true;
return; }
// Check if it was the last promise to execute if (resultPromises.length === array.length) { allExecutedCallback(null, resultPromises);
return; }
nextToCall += 1;
// Stop if we already acalled everything if (nextToCall > array.length) return;
// If it wasn't call a new promise array[nextToCall - 1].call() .then(ret => callbackOnePromiseGotExecuted({ error: null, result: ret, allExecutedCallback, array, })) .catch(e => callbackOnePromiseGotExecuted({ error: e, result: null, allExecutedCallback, array, }));}
/** * Handle the call of multiple promise with concurrency */function promiseWithConcurrencyCallback({ array, concurrencyNumber, allExecutedCallback,}) { for (let i = 0; i < concurrencyNumber; ++i) { array[nextToCall].call() .then(ret => callbackOnePromiseGotExecuted({ error: null, result: ret, allExecutedCallback, array, })) .catch(e => callbackOnePromiseGotExecuted({ error: e, result: null, allExecutedCallback, array, }));
nextToCall += 1; }
}
function promiseWithConcurrency(array, concurrencyNumber) { return new Promise((resolve, reject) => { promiseWithConcurrencyCallback({ array, concurrencyNumber, allExecutedCallback: (error, result) => { if (error) return reject(error);
return resolve(result); }, }); });}
const array = [ () => new Promise((resolve) => resolve('01')), () => new Promise((resolve) => resolve('02')), () => new Promise((resolve) => resolve('03')), () => new Promise((resolve) => resolve('04')), () => new Promise((resolve) => resolve('05')), () => new Promise((resolve) => resolve('06')),];
promiseWithConcurrency(array, 2) .then(rets => console.log('rets', rets)) .catch(error => console.log('error', error));


Related Topics



Leave a reply



Submit