How to Limit Concurrency When Using Es6's Promise.All()

What is the best way to limit concurrency when using ES6's Promise.all()?

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}

// The number of promises to process simultaneously.
var concurrency = 3

// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool.
var poolPromise = pool.start()

// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})

limit concurrency and wait until all promises complete even if some reject

It's simple enough to implement it yourself - make an array of functions that, when called, return the Promise. Then implement a limiter function that takes functions from that array and calls them, and once finished, recursively calls the limiter again until the array is empty:

const request = (file) => new Promise((res, rej) => {
console.log('requesting', file);
setTimeout(() => {
if (Math.random() < 0.5) {
console.log('resolving', file);
res(file);
} else {
console.log('rejecting', file);
rej(file);
}
}, 1000 + Math.random() * 1000);
});
const files = [1, 2, 3, 4, 5, 6];

const makeRequests = files.map(file => () => request(file));
const results = [];
let started = 0;
const recurse = () => {
const i = started++;
const makeRequest = makeRequests.shift();
return !makeRequest ? null : Promise.allSettled([makeRequest()])
.then(result => {
results[i] = result[0];
return recurse();
})
};
const limit = 2;
Promise.all(Array.from({ length: limit }, recurse))
.then(() => {
console.log(results);
});

Limit concurrency of promise being run

I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

function sequential(fn) { // limitConcurrency(fn, 1)
let q = Promise.resolve();
return function(x) {
const p = q.then(() => fn(x));
q = p.reflect();
return p;
};
}

For multiple concurrent requests it gets a little trickier, but can be done as well.

function limitConcurrency(fn, n) {
if (n == 1) return sequential(fn); // optimisation
let q = Promise.resolve();
const active = new Set();
const fst = t => t[0];
const snd = t => t[1];
return function(x) {
function put() {
const p = fn(x);
const a = p.reflect().then(() => {
active.delete(a);
});
active.add(a);
return [Promise.race(active), p];
}
if (active.size < n) {
const r = put()
q = fst(t);
return snd(t);
} else {
const r = q.then(put);
q = r.then(fst);
return r.then(snd)
}
};
}

Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

Example

import Promise from 'bluebird'

function sequential(fn) {
var q = Promise.resolve();
return (...args) => {
const p = q.then(() => fn(...args))
q = p.reflect()
return p
}
}

async function _delayPromise (seconds, str) {
console.log(`${str} started`)
await Promise.delay(seconds)
console.log(`${str} ended`)
return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
await delayPromise(100, "a:a")
await delayPromise(200, "a:b")
await delayPromise(300, "a:c")
}

async function b() {
await delayPromise(400, "b:a")
await delayPromise(500, "b:b")
await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done

How much is the limit of Promise.all?

According to the V8/V8 error code TooManyElementsInPromiseAll of the source code objects Promise

  T(TooManyElementsInPromiseAll, "Too many elements passed to Promise.all")

there is this limit. For the Promise.all i.e. the C++ PromiseAll we have there is a concept of MaximumFunctionContextSlots and kPromiseAllResolveElementCapabilitySlot, here it is the most interesting stuff from the source code:

// TODO(bmeurer): Move this to a proper context map in contexts.h?
// Similar to the AwaitContext that we introduced for await closures.
enum PromiseAllResolveElementContextSlots {
// Remaining elements count
kPromiseAllResolveElementRemainingSlot = Context::MIN_CONTEXT_SLOTS,

// Promise capability from Promise.all
kPromiseAllResolveElementCapabilitySlot,

// Values array from Promise.all
kPromiseAllResolveElementValuesArraySlot,

kPromiseAllResolveElementLength
};

I would expect to see a error throw like here

ThrowTypeError(context, MessageTemplate::TooManyElementsInPromiseAll);

Here it is the code that raise the TooManyElementsInPromiseAll error. Thank to Clarence that pointed me in the right direction!

BIND(&too_many_elements);
{
// If there are too many elements (currently more than 2**21-1), raise a
// RangeError here (which is caught directly and turned into a rejection)
// of the resulting promise. We could gracefully handle this case as well
// and support more than this number of elements by going to a separate
// function and pass the larger indices via a separate context, but it
// doesn't seem likely that we need this, and it's unclear how the rest
// of the system deals with 2**21 live Promises anyways.
Node* const result =
CallRuntime(Runtime::kThrowRangeError, native_context,
SmiConstant(MessageTemplate::kTooManyElementsInPromiseAll));
GotoIfException(result, &close_iterator, var_exception);
Unreachable();
}

The check of this limit it is here

// Check if we reached the limit.
TNode<Smi> const index = var_index.value();
GotoIf(SmiEqual(index, SmiConstant(PropertyArray::HashField::kMax)),
&too_many_elements);

so the kMax should solve the clue!

How to throttle Promise.all() to 5 promises per second?

I hope this would help you.

And also to be said this would use Promise.all to resolve all requests and if you have a large list of queries, this would wait for all to resolve and may cause a lot waiting in your code to get all responses.
And also if one of request rejects, Promise.all will reject.

I suggest if you don't need all results together it's better to use something else like lodash debounce or throttle or frameworks that handle this.

let items = [
{name: 'item1'},
{name: 'item2'},
{name: 'item3'},
{name: 'item4'},
{name: 'item5'},
{name: 'item6'}
];

// This is the api request that you send and return a promise
function apiCall(item) {
return new Promise((resolve) => {
setTimeout(() => resolve(item.name), 1000);
})
}

new Promise((resolve) => {
let results = [];

function sendReq (itemsList, iterate, apiCall) {
setTimeout(() => {
// slice itemsList to send request according to the api limit
let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
result = slicedArray.map(item => apiCall(item));
results = [...results, ...result];

// This will resolve the promise when reaches to the last iteration
if (iterate === Math.ceil(items.length / 5) - 1) {
resolve(results);
}
}, (1000 * iterate)); // every 1000ms runs (api limit of one second)
}

// This will make iteration to split array (requests) to chunks of five items
for (i = 0; i < Math.ceil(items.length / 5); i++) {
sendReq(items, i, apiCall);
}
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required

Is there a limit to how many promises can or should run concurrently?

Promises themselves have no particular coded limits. They are just a notification system and you could have millions of them just fine (as long as you had enough memory to hold those Javascript objects).

Now, if a promise represents an underlying asynchronous operation (which they usually do), there could very well be some limits to how many of that specific type of asynchronous operation can be in flight at the same time. For example, at some point you might run into limits of how many requests a single host would accept from you at the same time. Or, you might run into local resources issues with zillions of connections somewhere.

For things like node.js disk I/O operations, the underlying disk I/O sub-system already has a queuing system so that only a small number of operations are actually running at once and the rest are queued.

So, to answer a question about how many concurrent operations you can have, it can only be analyzed and answered in the context of a specific type of asynchronous request and sometimes even a specific type of receiving host.

If you know you're processing a large or potentially large array of requests and you'll be sending a network request for every item in the array, then it is common to code a limit yourself to avoid overwhelming either local resources or the target host resources. This is usually not done with a queue, but rather code that just launches N requests and then as one finishes, it launches the next one and so on. Both the Bluebird and Async libraries have methods for managing this for you. In Bluebird, it's the concurrency option for Promise.map(). I've also hand-coded loops that manage the number of concurrent connections several times myself and here are links to some of that code:

Promise.all consumes all my RAM

Javascript - how to control how many promises access network in parallel

Make several requests to an API that can only handle 20 request a minute

Loop through an api get request with variable URL

Choose proper async method for batch processing for max requests/sec

Nodejs: Async request with a list of URL



Related Topics



Leave a reply



Submit