Throttle Amount of Promises Open at a Given Time

Throttle amount of promises open at a given time

You can do this in one short function. (Returns values in order per naomik's suggestion. Thanks!)

/**
* Performs a list of callable actions (promise factories) so
* that only a limited number of promises are pending at any
* given time.
*
* @param listOfCallableActions An array of callable functions,
* which should return promises.
* @param limit The maximum number of promises to have pending
* at once.
* @returns A Promise that resolves to the full list of values
* when everything is done.
*/
function throttleActions(listOfCallableActions, limit) {
// We'll need to store which is the next promise in the list.
let i = 0;
let resultArray = new Array(listOfCallableActions.length);

// Now define what happens when any of the actions completes.
// Javascript is (mostly) single-threaded, so only one
// completion handler will call at a given time. Because we
// return doNextAction, the Promise chain continues as long as
// there's an action left in the list.
function doNextAction() {
if (i < listOfCallableActions.length) {
// Save the current value of i, so we can put the result
// in the right place
let actionIndex = i++;
let nextAction = listOfCallableActions[actionIndex];
return Promise.resolve(nextAction()).then(result => {
// Save results to the correct array index.
resultArray[actionIndex] = result;
}).then(doNextAction);
}
}

// Now start up the original <limit> number of promises.
// i advances in calls to doNextAction.
let listOfPromises = [];
while (i < limit && i < listOfCallableActions.length) {
listOfPromises.push(doNextAction());
}
return Promise.all(listOfPromises).then(() => resultArray);
}

// Test harness:

function delay(name, ms) {
return new Promise((resolve, reject) => setTimeout(() => {
console.log(name);
resolve(name);
}, ms));
}

var ps = [];
for (let i = 0; i < 10; i++) {
ps.push(() => delay("promise " + i, Math.random() * 3000));
}

throttleActions(ps, 3).then(result => console.log(result));

How to throttle Promise.all() to 5 promises per second?

I hope this would help you.

And also to be said this would use Promise.all to resolve all requests and if you have a large list of queries, this would wait for all to resolve and may cause a lot waiting in your code to get all responses.
And also if one of request rejects, Promise.all will reject.

I suggest if you don't need all results together it's better to use something else like lodash debounce or throttle or frameworks that handle this.

let items = [
{name: 'item1'},
{name: 'item2'},
{name: 'item3'},
{name: 'item4'},
{name: 'item5'},
{name: 'item6'}
];

// This is the api request that you send and return a promise
function apiCall(item) {
return new Promise((resolve) => {
setTimeout(() => resolve(item.name), 1000);
})
}

new Promise((resolve) => {
let results = [];

function sendReq (itemsList, iterate, apiCall) {
setTimeout(() => {
// slice itemsList to send request according to the api limit
let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
result = slicedArray.map(item => apiCall(item));
results = [...results, ...result];

// This will resolve the promise when reaches to the last iteration
if (iterate === Math.ceil(items.length / 5) - 1) {
resolve(results);
}
}, (1000 * iterate)); // every 1000ms runs (api limit of one second)
}

// This will make iteration to split array (requests) to chunks of five items
for (i = 0; i < Math.ceil(items.length / 5); i++) {
sendReq(items, i, apiCall);
}
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required

Limit concurrency of promise being run

I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

function sequential(fn) { // limitConcurrency(fn, 1)
let q = Promise.resolve();
return function(x) {
const p = q.then(() => fn(x));
q = p.reflect();
return p;
};
}

For multiple concurrent requests it gets a little trickier, but can be done as well.

function limitConcurrency(fn, n) {
if (n == 1) return sequential(fn); // optimisation
let q = Promise.resolve();
const active = new Set();
const fst = t => t[0];
const snd = t => t[1];
return function(x) {
function put() {
const p = fn(x);
const a = p.reflect().then(() => {
active.delete(a);
});
active.add(a);
return [Promise.race(active), p];
}
if (active.size < n) {
const r = put()
q = fst(t);
return snd(t);
} else {
const r = q.then(put);
q = r.then(fst);
return r.then(snd)
}
};
}

Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

Example

import Promise from 'bluebird'

function sequential(fn) {
var q = Promise.resolve();
return (...args) => {
const p = q.then(() => fn(...args))
q = p.reflect()
return p
}
}

async function _delayPromise (seconds, str) {
console.log(`${str} started`)
await Promise.delay(seconds)
console.log(`${str} ended`)
return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
await delayPromise(100, "a:a")
await delayPromise(200, "a:b")
await delayPromise(300, "a:c")
}

async function b() {
await delayPromise(400, "b:a")
await delayPromise(500, "b:b")
await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done

Timed promise queue / throttle

Update

The last answer was wrong, this works but I still think I can do better:

// call fn at most count times per delay.
const debounce = function (fn, delay, count) {
let working = 0, queue = [];
function work() {
if ((queue.length === 0) || (working === count)) return;
working++;
Promise.delay(delay).tap(() => working--).then(work);
let {context, args, resolve} = queue.shift();
resolve(fn.apply(context, args));
}
return function debounced() {
return new Promise(resolve => {
queue.push({context: this, args: arguments, resolve});
if (working < count) work();
});
};
};

function mockRequest() {
console.log("making request");
return Promise.delay(Math.random() * 100);
}

var bounced = debounce(mockRequest, 800, 5);
for (var i = 0; i < 5; i++) bounced();
setTimeout(function(){
for (var i = 0; i < 20; i++) bounced();
},2000);

So you need to make the requests throttle function-wide - that's fine. Promises have queueing pretty much built in.

var p = Promise.resolve(); // our queue

function makeRequest(){
p = p.then(function(){ // queue the promise, wait for the queue
return request("http://www.google.com");
});
var p2 = p; // get a local reference to the promise
// add 1000 ms delay to queue so the next caller has to wait
p = p.delay(1000);
return p2;
};

Now makeRequest calls will be at least 1000ms apart.

jfriend has pointed out that you need two requests per second and not a single one - this is just as easily solvable with a second queue:

var p = Promise.resolve(1); // our first queue
var p2 = Promise.resolve(2); // our second queue

function makeRequest(){

var turn = Promise.any([p, p2]).then(function(val){

// add 1000 ms delay to queue so the next caller has to wait
// here we wait for the request too although that's not really needed,
// check both options out and decide which works better in your case
if(val === 1){
p = p.return(turn).delay(1, 1000);
} else {
p2 = p2.return(turn).delay(1, 1000);
}
return request("http://www.google.com");
});

return turn; // return the actual promise
};

This can be generalized to n promises using an array similarly

What is the best way to limit concurrency when using ES6's Promise.all()?

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}

// The number of promises to process simultaneously.
var concurrency = 3

// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool.
var poolPromise = pool.start()

// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})

I need to throttle the amount of requests this for loop sends

Make sure you call throttledRequest in the scope that is invoking all of the request and not for each character/accessToken/slug so it will take all the request into count

// a function that throttled the request up to 10 requests a second
const throttledRequest = () => {
const requestCount = 0;

const fetchRequest = (accessToken, slug, name) =>
fetch(
`https://us.api.blizzard.com/profile/wow/character/${slug}/${name}?namespace=profile-us&locale=en_US`,
{
method: "GET",
headers: {
Authorization: `Bearer ${accessToken}`,
"Content-Type": "application/json",
},
}
).then((response) => {
console.log(response.status);
return response.json();
}).catch(err=>console.error(err));

return (accessToken, slug, name) => {
if (requestCount < 10) {
requestCount++;
return fetchRequest(accessToken, slug, name);
} else {
return new Promise((resolve, reject) => {
setTimeout(() => {
requestCount=0;
resolve(fetchRequest(accessToken, slug, name));
}, 1000);
});
}
};
};

const runWithAsyncAwait = async () => {
const request = throttledRequest();
for (let i = 0; i < 5; i++) {
const data = await request("accessToken", slug[i], "name");
profiledata(data);
}
};
const runWithPromises = async () => {
const request = throttledRequest();
for (let i = 0; i < 5; i++) {
request("accessToken", slug[i], "name").then((data) => profiledata(data));
}
};

Custom for..of using queue to solve concurrency in promises yields slower requests than Promise.all

It should not be too surprising that, if choosing between requesting 15 items simultaneously and requesting at most 5 requests at a time, the throttled/queued implementation takes longer. What had been done in parallel is now being done semi-serially. Presumably the use of throttling or serial behavior here is not to absolutely save wall-clock time, but rather to even out server load or allow the browser/network to prioritize other connections within its own limit.

One aspect of your code that you might not have expected, however: Your Promise.all batches of 5 will now take as long as the longest request before the next batch starts. Consequently, just before a new batch starts, it is likely that far fewer than 5 requests are currently open.

(A1-------)             (B1-----------)(C1--)                 |
(A2---------) (B2--) (C2-------------------)|
(A3---) (B3-------) (C3------) | done
(A4--------------------)(B4----) (C4----------) |
(A5-----) (B5---------) (C5----) |

TIME->--------------------------------------------------------|

In other implementations like PLimit and mine on SO, there are no intermediate calls to Promise.all, so you can complete those calls more optimally while also ensuring that no more than 5 open promises exist at once.

( 1-------)( 8-----------)(14--)            |
( 2---------)( 9--)(11-------------------) |
( 3---)( 6-------)(10------) | done
( 4--------------------)(13----) |
( 5-----)( 7---------)(12----)(15----------)|

TIME->--------------------------------------|

For this reason, you may want to abandon the use of Promise.all to better keep your open channels/threads saturated.



Related Topics



Leave a reply



Submit