Limit Concurrency of Promise Being Run

What is the best way to limit concurrency when using ES6's Promise.all()?

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}

// The number of promises to process simultaneously.
var concurrency = 3

// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool.
var poolPromise = pool.start()

// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})

Limit concurrency of promise being run

I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

function sequential(fn) { // limitConcurrency(fn, 1)
let q = Promise.resolve();
return function(x) {
const p = q.then(() => fn(x));
q = p.reflect();
return p;
};
}

For multiple concurrent requests it gets a little trickier, but can be done as well.

function limitConcurrency(fn, n) {
if (n == 1) return sequential(fn); // optimisation
let q = Promise.resolve();
const active = new Set();
const fst = t => t[0];
const snd = t => t[1];
return function(x) {
function put() {
const p = fn(x);
const a = p.reflect().then(() => {
active.delete(a);
});
active.add(a);
return [Promise.race(active), p];
}
if (active.size < n) {
const r = put()
q = fst(t);
return snd(t);
} else {
const r = q.then(put);
q = r.then(fst);
return r.then(snd)
}
};
}

Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

Example

import Promise from 'bluebird'

function sequential(fn) {
var q = Promise.resolve();
return (...args) => {
const p = q.then(() => fn(...args))
q = p.reflect()
return p
}
}

async function _delayPromise (seconds, str) {
console.log(`${str} started`)
await Promise.delay(seconds)
console.log(`${str} ended`)
return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
await delayPromise(100, "a:a")
await delayPromise(200, "a:b")
await delayPromise(300, "a:c")
}

async function b() {
await delayPromise(400, "b:a")
await delayPromise(500, "b:b")
await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done

limit concurrency and wait until all promises complete even if some reject

It's simple enough to implement it yourself - make an array of functions that, when called, return the Promise. Then implement a limiter function that takes functions from that array and calls them, and once finished, recursively calls the limiter again until the array is empty:

const request = (file) => new Promise((res, rej) => {
console.log('requesting', file);
setTimeout(() => {
if (Math.random() < 0.5) {
console.log('resolving', file);
res(file);
} else {
console.log('rejecting', file);
rej(file);
}
}, 1000 + Math.random() * 1000);
});
const files = [1, 2, 3, 4, 5, 6];

const makeRequests = files.map(file => () => request(file));
const results = [];
let started = 0;
const recurse = () => {
const i = started++;
const makeRequest = makeRequests.shift();
return !makeRequest ? null : Promise.allSettled([makeRequest()])
.then(result => {
results[i] = result[0];
return recurse();
})
};
const limit = 2;
Promise.all(Array.from({ length: limit }, recurse))
.then(() => {
console.log(results);
});

How can I limit Q promise concurrency?

I have a library that does this for you https://github.com/ForbesLindesay/throat

You can use it via browserify or download the standalone build from brcdn (https://www.brcdn.org/?module=throat&version=latest) and add it as a script tag.

Then (assuming the Promise constructor is polyfilled or implemented in your environment) you can do:

//remove this line if using standalone build
var throat = require('throat');

function limitConcurrency(promiseFactory, limit) {
var fn = throat(promiseFactory, limit);
return function () {
return Q(fn.apply(this, arguments));
}
}

You could just call throat(promiseFactory, limit) directly but that would return a promise promise rather than a Q promise.

I also really like using it with array.map.

// only allow 3 parallel downloads
var downloadedItems = Q.all(items.map(throat(download, 3)));

Is there a limit to how many promises can or should run concurrently?

Promises themselves have no particular coded limits. They are just a notification system and you could have millions of them just fine (as long as you had enough memory to hold those Javascript objects).

Now, if a promise represents an underlying asynchronous operation (which they usually do), there could very well be some limits to how many of that specific type of asynchronous operation can be in flight at the same time. For example, at some point you might run into limits of how many requests a single host would accept from you at the same time. Or, you might run into local resources issues with zillions of connections somewhere.

For things like node.js disk I/O operations, the underlying disk I/O sub-system already has a queuing system so that only a small number of operations are actually running at once and the rest are queued.

So, to answer a question about how many concurrent operations you can have, it can only be analyzed and answered in the context of a specific type of asynchronous request and sometimes even a specific type of receiving host.

If you know you're processing a large or potentially large array of requests and you'll be sending a network request for every item in the array, then it is common to code a limit yourself to avoid overwhelming either local resources or the target host resources. This is usually not done with a queue, but rather code that just launches N requests and then as one finishes, it launches the next one and so on. Both the Bluebird and Async libraries have methods for managing this for you. In Bluebird, it's the concurrency option for Promise.map(). I've also hand-coded loops that manage the number of concurrent connections several times myself and here are links to some of that code:

Promise.all consumes all my RAM

Javascript - how to control how many promises access network in parallel

Make several requests to an API that can only handle 20 request a minute

Loop through an api get request with variable URL

Choose proper async method for batch processing for max requests/sec

Nodejs: Async request with a list of URL

Concurrency limit in Q promises - node

Thanks to Dan, his answer and his help to integrate it with my code, it can be done using his gist and a snipplet like this:

var qlimit = require('../libs/qlimit');

var test = function(id) {
console.log('Running ' + id);
return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) {
console.log('Response ' + id);
return body;
});
}

test = qlimit.limitConcurrency(test, 1);

var data = [0, 1, 2];

data.forEach(function(id) {
console.log('Starting item ' + id);
Q.all([ test(id) ]);
});

This way you get something like:

  • Starting item 0
  • Starting item 1
  • Starting item 2
  • Running 0
  • Response 0
  • Running 1
  • Response 1
  • Running 2
  • Response 2

Which clearly is 1 request at a time.

The whole point that i was missing in the implementation is that you need to re-declare the function using limitConcurrency BEFORE starting the loop, not inside it.

Limit Q promise concurrency in Node js

You can request a new url in a then() block

myFunction(urls[0]).then(function(result) {
myFunction(urls[1]).then(function(result) {
myFunction(urls[2]).then(function(result) {
...
});
});
});

Of course, this would be its dynamic behaviour. I'd mantain a queue and dequeue a single url once a promise is resolved. Then make another request. And perhaps have a hash object relating urls to results.

A second take:

var urls = ...;
var limit = ...;
var dequeue = function() {
return an array containing up to limit
};

var myFunction = function(dequeue) {
var urls = dequeue();

$q.all(process urls);
};

myFunction(dequeue).then(function(result) {
myFunction(dequeue).then(function(result) {
myFunction(dequeue).then(function(result) {
...
});
});
});


Related Topics



Leave a reply



Submit