使用 ES6 的 Promise.all() 时限制并发性的最佳方法是什么?

What is the best way to limit concurrency when using ES6's Promise.all()?

本文关键字:并发 最佳 是什么 方法 Promise ES6 all 使用      更新时间:2023-09-26

我有一些代码正在迭代从数据库中查询的列表,并为该列表中的每个元素发出HTTP请求。 该列表有时可能是一个相当大的数字(数千个(,我想确保我不会遇到具有数千个并发HTTP请求的Web服务器。


function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
Promise.all(getCounts()).then(() => { /* snip */});

此代码在节点 4.3.2 上运行。 重申一下,是否可以管理Promise.all,以便在任何给定时间只有一定数量的承诺在进行中?

P 限制

我将承诺并发限制与自定义脚本、蓝鸟、es6-promise-pool 和 p 限制进行了比较。我相信 p-limit 有最简单的,精简的实现来满足这个需求。请参阅他们的文档。



  • ECMAScript 2017 (版本 8(
  • 节点版本> 8.2.1


在此示例中,我们需要为数组中的每个 URL 运行一个函数(例如,可能是 API 请求(。这里这被称为 fetchData() .如果我们有数千个项目的数组要处理,并发对于节省 CPU 和内存资源肯定很有用。

const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);


使用 Array.prototype.splice

while (funcs.length) {
  // 100 at a time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )


/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()

// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)

我们可以使用相同的迭代器并在 worker 之间共享它。

如果您使用 .entries() 而不是 .values() 你会得到一个产生 [index, value] 的迭代器,我将在下面以 2 的并发性进行演示

const sleep = t => new Promise(rs => setTimeout(rs, t))
const iterator = Array.from('abcdefghij').entries()
// const results = [] || Array(someLength)
async function doWork (iterator, i) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(`Worker#${i}: ${index},${item}`)
    // in case you need to store the results in order
    // results[index] = item + item
    // or if the order dose not mather
    // results.push(item + item)
const workers = Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator
Promise.allSettled(workers).then(console.log.bind(null, 'done'))


更棒的是,你可以在节点中做stream.Readable.from(iterator)(最终也可以在whatwg流中(。 并且使用可转移的 ReadbleStream,如果您与 Web 工作者合作,这使得这种潜力在功能中非常有用 也用于表演

<小时 />

注意:与示例异步池相比,与此不同的是,它生成了两个工作线程,因此如果一个工作线程由于某种原因在索引 5 处抛出错误,它不会阻止另一个工作线程执行其余操作。因此,您从执行 2 个并发减少到 1 个。(所以它不会止步于此(所以我的建议是,你捕获doWork函数中的所有错误




但是,这里真的没有必要重新发明轮子。可用于此目的的一个库是 es6-promise-pool .从他们的例子中:

var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
// The number of promises to process simultaneously. 
var concurrency = 3
// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool. 
var poolPromise = pool.start()
// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)

与其使用承诺来限制 http 请求,不如使用 node 的内置 http。Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有对限制内容的更多控制的额外优势。




var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

如果向同一源发出多个请求,将keepAlive设置为 true 也可能对您有所帮助(有关详细信息,请参阅上面的文档(。

bluebird的Promise.map可以采用并发选项来控制应该并行运行多少个承诺。有时它比.all容易,因为您不需要创建 promise 数组。

const Promise = require('bluebird')
function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
  }, {concurrency: 10}); // <---- at most 10 http requests at a time



所以,这里有一个代码示例,部分基于Endless 的答案和 T.J. Crowder 的答案。

// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
    tasks.push(async () => {
        console.log(`start ${i}`);
        await sleep(Math.random() * 1000);
        console.log(`end ${i}`);
        return i;
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
(async () => {
    for await (let value of runTasks(3, tasks.values())) {
        console.log(`output ${value}`);
async function* runTasks(maxConcurrency, taskIterator) {
    async function* createWorkerIterator() {
        // Each AsyncGenerator that this function* creates is a worker,
        // polling for tasks from the shared taskIterator. Sharing the
        // taskIterator ensures that each worker gets unique tasks.
        for (const task of taskIterator) yield await task();
    const asyncIterators = new Array(maxConcurrency);
    for (let i = 0; i < maxConcurrency; i++) {
        asyncIterators[i] = createWorkerIterator();
    yield* raceAsyncIterators(asyncIterators);
async function* raceAsyncIterators(asyncIterators) {
    async function nextResultWithItsIterator(iterator) {
        return { result: await iterator.next(), iterator: iterator };
    /** @type Map<AsyncIterator<T>,
        Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
    const promises = new Map();
    for (const iterator of asyncIterators) {
        promises.set(iterator, nextResultWithItsIterator(iterator));
    while (promises.size) {
        const { result, iterator } = await Promise.race(promises.values());
        if (result.done) {
        } else {
            promises.set(iterator, nextResultWithItsIterator(iterator));
            yield result.value;


此解决方案是围绕异步生成器函数构建的,许多 JS 开发人员可能不熟悉。

生成器函数(又名function*函数(返回一个"生成器",即结果的迭代器。允许生成器函数使用 yield 关键字,而您通常可能使用 return 关键字。调用方第一次在生成器上调用 next()(或使用 for...of 循环(时,function* 函数将运行,直到它yield值;这将成为迭代器的next()值。但是在随后调用 next() 时,生成器函数将从 yield 语句恢复,就在它中断的地方,即使它处于循环中间。(您也可以yield* ,以生成另一个生成器函数的所有结果。

"异步生成器函数"(async function*(是一个返回"异步迭代器"的生成器函数,它是承诺的迭代器。可以在异步迭代器上调用for await...of。异步生成器函数可以使用 await 关键字,就像您在任何async function中所做的那样。


runTasks() 是一个异步生成器函数,因此我们可以用 for await...of 循环调用它。每次循环运行时,我们都会处理最新完成任务的结果。

runTasks()创建 N 个异步迭代器,即"worker"。 每个工作线程从共享taskIterator轮询任务,确保每个工作线程获得唯一的任务。

该示例使用 3 个并发工作线程调用runTasks,因此同时启动的任务不超过 3 个。当任何任务完成时,我们会立即将下一个任务排队。(这优于"批处理",在"批处理"中,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批 3 个任务。

runTasks() 最后用 yield* raceAsyncIterators() "赛车"其异步迭代器。 raceAsyncIterators()就像Promise.race(),但它与 N 个承诺迭代器比赛,而不仅仅是 N 个承诺;它返回一个异步迭代器,该迭代器产生已解析的承诺的结果。

raceAsyncIterators()首先定义从每个迭代器到承诺的promises Map。每个承诺都是迭代结果以及生成它的迭代器的承诺。

使用 promises 映射,我们可以Promise.race()映射的值,从而为我们提供获胜的迭代结果及其迭代器。如果迭代器完全done,我们将其从映射中删除;否则,我们将它在promises映射中的承诺替换为迭代器的next()承诺和yield result.value

总之,runTasks() 是一个异步生成器函数,它产生与任务的 N 个并发异步迭代器进行竞速的结果,因此最终用户只需for await (let value of runTasks(3, tasks.values()))在每个结果可用时立即对其进行处理。


npm install tiny-async-pool


使用本机 ES6/ES7 以有限的并发运行多个承诺返回和异步函数

asyncPool 在有限的并发池中运行多个 promise-return 和 async 函数。一旦其中一个承诺被拒绝,它就会立即拒绝。当所有承诺完成时,它会解决。它尽快调用迭代器函数(在并发限制下(。


const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.





const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4


async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
  return outs;
async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);


// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
}, delay));
// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];
// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.
// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms
// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms
// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms
console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3
// Conclusion: Execution order and performance is different,
// but return order is still identical






其他人已经建议的异步池库可能是我的实现的更好选择,因为它的工作方式几乎相同,并且具有更简洁的实现,巧妙地使用了 Promise.race((: https://github.com/rxaviers/async-pool/blob/master/lib/es7.js



async function main() {
  const s = new Semaphore(100);
  const res = await Promise.all(
    entities.map((users) => 
      s.runExclusive(() => remoteServer.getCount(user))
  return res;

我正在使用来自 async-mutex 的信号量实现,它有不错的文档和 TypeScript 支持。

如果你想深入挖掘这样的主题,你可以看看这本书"信号量的小书",它以PDF格式免费提供 这里

不幸的是,使用原生 Promise.all 无法做到这一点,所以你必须有创造力。


它利用了一个称为迭代器的较新的JavaScript功能。 迭代器基本上跟踪哪些项目已处理,哪些项目尚未处理。

为了在代码中使用它,您需要创建一个异步函数数组。 每个异步函数向同一迭代器询问需要处理的下一项。 每个函数异步处理自己的项目,完成后向迭代器请求一个新项目。 一旦迭代器用完了项目,所有函数就会完成。


const items = [
// get a cursor that keeps track of what items have already been processed.
let cursor = items.entries();
// create 5 for loops that each run off the same cursor which keeps track of location
Array(5).fill().forEach(async () => {
    for (let [index, url] of cursor){
        console.log('getting url is ', index, url)
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text())
        console.log('text is', text.slice(0, 20))

这是流式传输和"p-limit"的基本示例。它将http读取流流式传输到mongo db。

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;

const pipeline = util.promisify(stream.pipeline)
const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())
                        function handleResolve(someData) {
                            data.someData = someData;    
                            done(null, data);
                        function handleError(error) {
            await pipeline(

这么多好的解决方案。 我从 @Endless 发布的优雅解决方案开始,最终得到了这个小扩展方法,它不使用任何外部库,也不批量运行(尽管假设您具有异步等功能(:

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    catch (err) {
                        results[index] = err;
                // No more work to do
            catch (err) {
                // This worker is dead
    await Promise.all(workerThreads);
    return results;

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        catch (err) {
                            results[index] = err;
                    // No more work to do
                catch (err) {
                    // This worker is dead
        await Promise.all(workerThreads);
        return results;
    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
       }, n * 1000);
    var results = Promise.allWithLimit(demoTasks);

  • @tcooc的回答相当冷静。不知道它,将来会利用它。
  • 我也喜欢@MatthewRideout的回答,但它使用外部库!!


 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
        add(f){  // the argument f is a function of signature () => Promise
            return new Promise((resolve, reject) => {
                    () => f().then(resolve).catch(reject)
//                        TESTS
function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
        }, timeout)

const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);

const cappedPool = new Pool(2);
const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

中的线程池。使用 const cappedPool = new Pool(2) 创建池的一个实例后,您只需cappedPool.add(() => myPromise) .


const resultPromise = cappedPool.add( () => dbCall(...))
.then( actualResult => {
   // Do something with the result form the DB

此解决方案使用异步生成器来管理带有原版 javascript 的并发承诺。throttle生成器有 3 个参数:

  • 要作为参数提供给 promise 生成函数的值数组。(例如,网址数组。
  • 返回承诺的函数。(例如,返回 HTTP 请求的承诺。
  • 一个整数,表示允许的最大并发承诺数。

承诺仅根据需要实例化,以减少内存消耗。可以使用 for await 对结果进行迭代...的声明。

下面的示例提供了一个用于检查承诺状态的函数、限制异步生成器,以及一个基于 setTimeout 返回承诺的简单函数。最后的异步IIFE定义超时值的储存库,设置throttle返回的异步可迭代对象,然后在结果解析时迭代结果。


请注意,异步生成器需要 Node.js 16+

const promiseState = function( promise ) {
  const control = Symbol();
  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? 'pending' : 'fulfilled' )
    .catch( () => 'rejected' );
const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
  let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );
  while ( iterable.length > 0 ) {
    await Promise.any( iterable );
    const pending = [];
    const resolved = [];
    for ( const currentValue of iterable ) {
      if ( await promiseState( currentValue ) === 'pending' ) {
        pending.push( currentValue );
      } else {
        resolved.push( currentValue );
    console.log({ pending, resolved, reservoir });
    iterable = [
      ...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
    yield Promise.allSettled( resolved );
const getTimeout = delay => new Promise( ( resolve, reject ) => {
  setTimeout(resolve, delay, delay);
} );
( async () => {
  const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];
  const throttledRequests = throttle( test, getTimeout, 4 );
  for await ( const timeout of throttledRequests ) {
    console.log( timeout );
} )();

使用 tiny-async-pool ES9 进行等待...的 API,您可以执行以下操作:

const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;
for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
  console.log(user, count);

上面的 asyncPool 函数返回一个异步迭代器,该迭代器在承诺完成(在并发限制下(后立即生成,并在其中一个承诺拒绝后立即拒绝。

下面的concurrent函数将返回一个 Promise,该 Promise 解析为已解析的承诺值数组,同时实现并发限制。 没有第三方库。

// waits 50 ms then resolves to the passed-in arg
const sleepAndResolve = s => new Promise(rs => setTimeout(()=>rs(s), 50))
// queue 100 promises
const funcs = []
for(let i=0; i<100; i++) funcs.push(()=>sleepAndResolve(i))
//run the promises with a max concurrency of 10
.then(console.log) // prints [0,1,2...,99]
.catch(()=>console.log("there was an error"))
 * Run concurrent promises with a maximum concurrency level
 * @param concurrency The number of concurrently running promises
 * @param funcs An array of functions that return promises
 * @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
function concurrent(concurrency, funcs) {
    return new Promise((resolve, reject) => {
        let index = -1;
        const p = [];
        for (let i = 0; i < Math.max(1, Math.min(concurrency, funcs.length)); i++)
        function runPromise() {
            if (++index < funcs.length)
                (p[p.length] = funcs[index]()).then(runPromise).catch(reject);
            else if (index === funcs.length)


 * Run concurrent promises with a maximum concurrency level
 * @param concurrency The number of concurrently running promises
 * @param funcs An array of functions that return promises
 * @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
function concurrent<V>(concurrency:number, funcs:(()=>Promise<V>)[]):Promise<V[]> {
  return new Promise((resolve,reject)=>{
    let index = -1;
    const p:Promise<V>[] = []
    for(let i=0; i<Math.max(1,Math.min(concurrency, funcs.length)); i++) runPromise()
    function runPromise() {
      if (++index < funcs.length) (p[p.length] = funcs[index]()).then(runPromise).catch(reject)
      else if (index === funcs.length) Promise.all(p).then(resolve).catch(reject)

已经有很多答案了,但我最终使用了一个非常简单、不需要库或睡眠的解决方案,它只使用几个命令。Promise.all(( 只是让你知道传递给它的所有承诺何时完成。因此,您可以间歇性地检查队列以查看它是否已准备好进行更多工作,如果是,请添加更多进程。


// init vars
const batchSize = 5
const calls = []
// loop through data and run processes  
for (let [index, data] of [1,2,3].entries()) {
   // pile on async processes 
   // every 5th concurrent call, wait for them to finish before adding more
   if (index % batchSize === 0) await Promise.all(calls)
// clean up for any data to process left over if smaller than batch size
const allFinishedProcs = await Promise.all(calls)





async function batchQuery(queries, limit) {
  limit = Math.min(queries.length, limit);
  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(queries.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;
    function recursiveQuery() {
      let index = startedCount++;
        .then(res => {
          responsesOrErrors[index] = res;
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        .finally(() => {
          if (finishedCount === queries.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < queries.length) {
    for (let i = 0; i < limit; i++) {
async function doQuery(query) {
  console.log(`${query} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${query} finished successfully`);
        resolve(`${query} success`);
      } else {
        console.log(`${query} finished with error`);
        reject(`${query} error`);
    }, delay);
const queries = new Array(10).fill('query').map((query, index) => `${query}_${index + 1}`);
batchQuery(queries, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));

所以我试图使一些显示的示例适用于我的代码,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包批处理承诺对我来说肯定是最简单的途径


应用程序接口batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee(承诺:迭代将在每批之后调用。


Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
  }, 100);
.then(results => {
  console.log(results); // [1,2,3,4,5]


  var self = this;
  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
  return tracker(0); 

扩展@deceleratedcaviar发布的答案,我创建了一个"批处理"实用程序函数,该函数以参数为:值数组、并发限制和处理函数。是的,我意识到使用 Promise.all 这种方式更类似于批处理而不是真正的并发性,但如果目标是一次限制过多的 HTTP 调用,我会使用这种方法,因为它很简单并且不需要外部库。

async function batch(o) {
  let arr = o.arr
  let resp = []
  while (arr.length) {
    let subset = arr.splice(0, o.limit)
    let results = await Promise.all(subset.map(o.process))
  return [].concat.apply([], resp)
let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }
async function calc(val) { return val * 100 }
(async () => {
  let resp = await batch({
    arr: arr,
    limit: 100,
    process: calc

另一个带有自定义承诺库 (CPromise( 的解决方案:

  • 使用生成器 实时代码沙盒演示
    import { CPromise } from "c-promise2";
    import cpFetch from "cp-fetch";
    const promise = CPromise.all(
      function* () {
        const urls = [
        for (const url of urls) {
          yield cpFetch(url); // add a promise to the pool
          console.log(`Request [${url}] completed`);
      { concurrency: 2 }
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: ${e}`)
    // yeah, we able to cancel the task and abort pending network requests
    // setTimeout(() => promise.cancel(), 4500);
  • 使用映射器实时代码沙盒演示

    import { CPromise } from "c-promise2";
    import cpFetch from "cp-fetch";
    const promise = CPromise.all(
        mapper: (url) => {
          console.log(`Request [${url}]`);
          return cpFetch(url);
        concurrency: 2
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: ${e}`)
    // yeah, we able to cancel the task and abort pending network requests
    //setTimeout(() => promise.cancel(), 4500);

警告 这还没有针对效率进行基准测试,并且执行了大量的数组复制/创建


import chunk from 'lodash.chunk';
const maxConcurrency = (max) => (dataArr, promiseFn) =>
  chunk(dataArr, max).reduce(
      async (agg, batch) => [
          ...(await agg),
          ...(await Promise.all(batch.map(promiseFn)))


const randomFn = (data) =>
    new Promise((res) => setTimeout(
      () => res(data + 1),
        Math.random() * 1000

const result = await maxConcurrency(5)(
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
console.log('result+++', result);

我有创建块并使用 .reduce 函数等待每个块承诺.alls 完成的解决方案。如果承诺有一些调用限制,我还会添加一些延迟。

export function delay(ms: number) {
  return new Promise<void>((resolve) => setTimeout(resolve, ms));
export const chunk = <T>(arr: T[], size: number): T[][] => [
  ...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));
const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items
await groupedIdList.reduce(async (prev, subIdList) => {
  await prev;
  // Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
  const data = await Promise.all(subIdList.map(myPromise));
  await delay(500);
}, Promise.resolve());

可以使用 https://www.npmjs.com/package/job-pipe 来限制对服务器的请求


const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })


const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)


await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()



async function async_arr<T1, T2>(
    arr: T1[],
    func: (x: T1) => Promise<T2> | T2, //can be sync or async
    limit = 5
) {
    let results: T2[] = [];
    let workers = [];
    let current = Math.min(arr.length, limit);
    async function process(i) {
        if (i < arr.length) {
            results[i] = await Promise.resolve(func(arr[i]));
            await process(current++);
    for (let i = 0; i < current; i++) {
    await Promise.all(workers);
    return results;

这是我的食谱,基于killdash9的答案。它允许选择异常的行为(Promise.all vs Promise.allSettled(。

// Given an array of async functions, runs them in parallel,
// with at most maxConcurrency simultaneous executions
// Except for that, behaves the same as Promise.all,
// unless allSettled is true, where it behaves as Promise.allSettled  
function concurrentRun(maxConcurrency = 10, funcs = [], allSettled = false) {
  if (funcs.length <= maxConcurrency) {
    const ps = funcs.map(f => f());
    return allSettled ? Promise.allSettled(ps) : Promise.all(ps);
  return new Promise((resolve, reject) => {
    let idx = -1;
    const ps = new Array(funcs.length);
    function nextPromise() {
      idx += 1;
      if (idx < funcs.length) {
        (ps[idx] = funcs[idx]()).then(nextPromise).catch(allSettled ? nextPromise : reject);
      } else if (idx === funcs.length) {
        (allSettled ? Promise.allSettled(ps) : Promise.all(ps)).then(resolve).catch(reject);
    for (let i = 0; i < maxConcurrency; i += 1) nextPromise();
<</div> div class="answers">



import {pipeAsync, map, page} from 'iter-ops';
const i = pipeAsync(
    users, // make it asynchronous
    page(10), // split into pages of 10 items in each
    map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
    wait() // resolve each page in the pipeline
// below triggers processing page-by-page:
for await(const p of i) {
    //=> p = resolved page of data




const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
