使用 ES6 的 Promise.all() 时限制并发性的最佳方法是什么?
What is the best way to limit concurrency when using ES6's Promise.all()?
我有一些代码正在迭代从数据库中查询的列表,并为该列表中的每个元素发出HTTP请求。 该列表有时可能是一个相当大的数字(数千个(,我想确保我不会遇到具有数千个并发HTTP请求的Web服务器。
此代码的缩写版本目前如下所示...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
此代码在节点 4.3.2 上运行。 重申一下,是否可以管理Promise.all
,以便在任何给定时间只有一定数量的承诺在进行中?
P 限制
我将承诺并发限制与自定义脚本、蓝鸟、es6-promise-pool 和 p 限制进行了比较。我相信 p-limit 有最简单的,精简的实现来满足这个需求。请参阅他们的文档。
要求
与示例中的异步兼容
- ECMAScript 2017 (版本 8(
- 节点版本> 8.2.1
我的例子
在此示例中,我们需要为数组中的每个 URL 运行一个函数(例如,可能是 API 请求(。这里这被称为 fetchData()
.如果我们有数千个项目的数组要处理,并发对于节省 CPU 和内存资源肯定很有用。
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});
(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();
控制台日志结果是已解决的承诺响应数据的数组。
使用 Array.prototype.splice
while (funcs.length) {
// 100 at a time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
如果您知道迭代器的工作原理以及它们的使用方式,那么您就不需要任何额外的库,因为自己构建自己的并发性变得非常容易。让我演示一下:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}
我们可以使用相同的迭代器并在 worker 之间共享它。
如果您使用 .entries()
而不是 .values()
你会得到一个产生 [index, value]
的迭代器,我将在下面以 2 的并发性进行演示
const sleep = t => new Promise(rs => setTimeout(rs, t))
const iterator = Array.from('abcdefghij').entries()
// const results = [] || Array(someLength)
async function doWork (iterator, i) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(`Worker#${i}: ${index},${item}`)
// in case you need to store the results in order
// results[index] = item + item
// or if the order dose not mather
// results.push(item + item)
}
}
const workers = Array(2).fill(iterator).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.allSettled(workers).then(console.log.bind(null, 'done'))
这样做的好处是,您可以拥有生成器功能,而不是一次准备好所有东西。
更棒的是,你可以在节点中做stream.Readable.from(iterator)
(最终也可以在whatwg流中(。 并且使用可转移的 ReadbleStream,如果您与 Web 工作者合作,这使得这种潜力在功能中非常有用 也用于表演
注意:与示例异步池相比,与此不同的是,它生成了两个工作线程,因此如果一个工作线程由于某种原因在索引 5 处抛出错误,它不会阻止另一个工作线程执行其余操作。因此,您从执行 2 个并发减少到 1 个。(所以它不会止步于此(所以我的建议是,你捕获doWork
函数中的所有错误
,Promise.all()
不会触发承诺开始工作,创建承诺本身会触发
考虑到这一点,一种解决方案是在解决承诺时检查是否应该启动新承诺或您是否已经达到极限。
但是,这里真的没有必要重新发明轮子。可用于此目的的一个库是 es6-promise-pool
.从他们的例子中:
var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})
与其使用承诺来限制 http 请求,不如使用 node 的内置 http。Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有对限制内容的更多控制的额外优势。
agent.maxSockets
默认情况下设置为无穷大。确定代理可以为每个源打开多少个并发套接字。源是"主机:端口"或"主机:端口:本地地址"的组合。
例如:
var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
如果向同一源发出多个请求,将keepAlive
设置为 true 也可能对您有所帮助(有关详细信息,请参阅上面的文档(。
bluebird的Promise.map可以采用并发选项来控制应该并行运行多少个承诺。有时它比.all
容易,因为您不需要创建 promise 数组。
const Promise = require('bluebird')
function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}
正如此答案线程中的所有其他人所指出的那样,如果您需要限制并发性,Promise.all()
不会做正确的事情。但理想情况下,您甚至不应该等到所有承诺都完成后再处理它们。
相反,您希望在每个结果可用时尽快处理它,这样您就不必等待最后一个承诺完成才开始迭代它们。
所以,这里有一个代码示例,部分基于Endless 的答案和 T.J. Crowder 的答案。
// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})();
async function* runTasks(maxConcurrency, taskIterator) {
async function* createWorkerIterator() {
// Each AsyncGenerator that this function* creates is a worker,
// polling for tasks from the shared taskIterator. Sharing the
// taskIterator ensures that each worker gets unique tasks.
for (const task of taskIterator) yield await task();
}
const asyncIterators = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
asyncIterators[i] = createWorkerIterator();
}
yield* raceAsyncIterators(asyncIterators);
}
async function* raceAsyncIterators(asyncIterators) {
async function nextResultWithItsIterator(iterator) {
return { result: await iterator.next(), iterator: iterator };
}
/** @type Map<AsyncIterator<T>,
Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
const promises = new Map();
for (const iterator of asyncIterators) {
promises.set(iterator, nextResultWithItsIterator(iterator));
}
while (promises.size) {
const { result, iterator } = await Promise.race(promises.values());
if (result.done) {
promises.delete(iterator);
} else {
promises.set(iterator, nextResultWithItsIterator(iterator));
yield result.value;
}
}
}
这里面有很多魔力;让我解释一下。
此解决方案是围绕异步生成器函数构建的,许多 JS 开发人员可能不熟悉。
生成器函数(又名function*
函数(返回一个"生成器",即结果的迭代器。允许生成器函数使用 yield
关键字,而您通常可能使用 return
关键字。调用方第一次在生成器上调用 next()
(或使用 for...of
循环(时,function*
函数将运行,直到它yield
值;这将成为迭代器的next()
值。但是在随后调用 next()
时,生成器函数将从 yield
语句恢复,就在它中断的地方,即使它处于循环中间。(您也可以yield*
,以生成另一个生成器函数的所有结果。
"异步生成器函数"(async function*
(是一个返回"异步迭代器"的生成器函数,它是承诺的迭代器。可以在异步迭代器上调用for await...of
。异步生成器函数可以使用 await
关键字,就像您在任何async function
中所做的那样。
在示例中,我们使用任务函数数组调用runTasks()
;我们在数组上调用.values()
以将数组转换为迭代器。
runTasks()
是一个异步生成器函数,因此我们可以用 for await...of
循环调用它。每次循环运行时,我们都会处理最新完成任务的结果。
runTasks()
创建 N 个异步迭代器,即"worker"。 每个工作线程从共享taskIterator
轮询任务,确保每个工作线程获得唯一的任务。
该示例使用 3 个并发工作线程调用runTasks
,因此同时启动的任务不超过 3 个。当任何任务完成时,我们会立即将下一个任务排队。(这优于"批处理",在"批处理"中,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批 3 个任务。
runTasks()
最后用 yield* raceAsyncIterators()
"赛车"其异步迭代器。 raceAsyncIterators()
就像Promise.race()
,但它与 N 个承诺迭代器比赛,而不仅仅是 N 个承诺;它返回一个异步迭代器,该迭代器产生已解析的承诺的结果。
raceAsyncIterators()
首先定义从每个迭代器到承诺的promises
Map
。每个承诺都是迭代结果以及生成它的迭代器的承诺。
使用 promises
映射,我们可以Promise.race()
映射的值,从而为我们提供获胜的迭代结果及其迭代器。如果迭代器完全done
,我们将其从映射中删除;否则,我们将它在promises
映射中的承诺替换为迭代器的next()
承诺和yield result.value
。
总之,runTasks()
是一个异步生成器函数,它产生与任务的 N 个并发异步迭代器进行竞速的结果,因此最终用户只需for await (let value of runTasks(3, tasks.values()))
在每个结果可用时立即对其进行处理。
我建议库异步池:https://github.com/rxaviers/async-pool
npm install tiny-async-pool
描述:
使用本机 ES6/ES7 以有限的并发运行多个承诺返回和异步函数
asyncPool 在有限的并发池中运行多个 promise-return 和 async 函数。一旦其中一个承诺被拒绝,它就会立即拒绝。当所有承诺完成时,它会解决。它尽快调用迭代器函数(在并发限制下(。
用法:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
的ES7解决方案,可以复制粘贴友好,并具有完整的Promise.all()
/map()
替代方案,并发限制。
与Promise.all()
类似,它维护返回订单以及非承诺返回值的回退。
我还包括了不同实现的比较,因为它说明了其他一些解决方案遗漏的某些方面。
用法
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
实现
async function asyncBatch(args, fn, limit = 8) {
// Copy arguments to avoid side effects
args = [...args];
const outs = [];
while (args.length) {
const batch = args.splice(0, limit);
const out = await Promise.all(batch.map(fn));
outs.push(...out);
}
return outs;
}
async function asyncPool(args, fn, limit = 8) {
return new Promise((resolve) => {
// Copy arguments to avoid side effect, reverse queue as
// pop is faster than shift
const argQueue = [...args].reverse();
let count = 0;
const outs = [];
const pollNext = () => {
if (argQueue.length === 0 && count === 0) {
resolve(outs);
} else {
while (count < limit && argQueue.length) {
const index = args.length - argQueue.length;
const arg = argQueue.pop();
count += 1;
const out = fn(arg);
const processOut = (out, index) => {
outs[index] = out;
count -= 1;
pollNext();
};
if (typeof out === 'object' && out.then) {
out.then(out => processOut(out, index));
} else {
processOut(out, index);
}
}
}
};
pollNext();
});
}
比较
// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
console.log(delay);
resolve(delay);
}, delay));
// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];
// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.
// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms
// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms
// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms
console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3
// Conclusion: Execution order and performance is different,
// but return order is still identical
结论
asyncPool()
应该是最好的解决方案,因为它允许新请求在任何以前的请求完成后立即启动。
asyncBatch()
作为比较包含在内,因为它的实现更易于理解,但它的性能应该较慢,因为同一批次中的所有请求都需要完成才能启动下一批。
在这个人为的例子中,非受限香草Promise.all()
当然是最快的,而其他的在现实世界的拥塞场景中可能表现得更理想。
更新
其他人已经建议的异步池库可能是我的实现的更好选择,因为它的工作方式几乎相同,并且具有更简洁的实现,巧妙地使用了 Promise.race((: https://github.com/rxaviers/async-pool/blob/master/lib/es7.js
希望我的回答仍然可以提供教育价值。
信号量是众所周知的并发原语,旨在解决类似的问题。这是非常通用的结构,信号量的实现存在于多种语言中。这就是使用信号量来解决此问题的方式:
async function main() {
const s = new Semaphore(100);
const res = await Promise.all(
entities.map((users) =>
s.runExclusive(() => remoteServer.getCount(user))
)
);
return res;
}
我正在使用来自 async-mutex 的信号量实现,它有不错的文档和 TypeScript 支持。
如果你想深入挖掘这样的主题,你可以看看这本书"信号量的小书",它以PDF格式免费提供 这里
不幸的是,使用原生 Promise.all 无法做到这一点,所以你必须有创造力。
这是我在不使用任何外部库的情况下能找到的最快、最简洁的方法。
它利用了一个称为迭代器的较新的JavaScript功能。 迭代器基本上跟踪哪些项目已处理,哪些项目尚未处理。
为了在代码中使用它,您需要创建一个异步函数数组。 每个异步函数向同一迭代器询问需要处理的下一项。 每个函数异步处理自己的项目,完成后向迭代器请求一个新项目。 一旦迭代器用完了项目,所有函数就会完成。
感谢@Endless的启发。
const items = [
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2'
]
// get a cursor that keeps track of what items have already been processed.
let cursor = items.entries();
// create 5 for loops that each run off the same cursor which keeps track of location
Array(5).fill().forEach(async () => {
for (let [index, url] of cursor){
console.log('getting url is ', index, url)
// run your async task instead of this next line
var text = await fetch(url).then(res => res.text())
console.log('text is', text.slice(0, 20))
}
})
这是流式传输和"p-limit"的基本示例。它将http读取流流式传输到mongo db。
const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const pipeline = util.promisify(stream.pipeline)
const outputDBConfig = {
dbURL: 'yr-db-url',
collection: 'some-collection'
};
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) => {
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) => {
let someDataPromise = limit(() => yr_async_call_to_somewhere())
someDataPromise.then(
function handleResolve(someData) {
data.someData = someData;
done(null, data);
},
function handleError(error) {
done(error)
}
);
})
await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
}
这么多好的解决方案。 我从 @Endless 发布的优雅解决方案开始,最终得到了这个小扩展方法,它不使用任何外部库,也不批量运行(尽管假设您具有异步等功能(:
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
let n = (i + 1) * 5;
setTimeout(() => {
console.log(`Did nothing for ${n} seconds`);
resolve(n);
}, n * 1000);
}));
var results = Promise.allWithLimit(demoTasks);
- @tcooc的回答相当冷静。不知道它,将来会利用它。
- 我也喜欢@MatthewRideout的回答,但它使用外部库!!
只要有可能,我都会尝试自己开发这种东西,而不是去图书馆。你最终学到了很多以前看起来令人生畏的概念。
class Pool{
constructor(maxAsync) {
this.maxAsync = maxAsync;
this.asyncOperationsQueue = [];
this.currentAsyncOperations = 0
}
runAnother() {
if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
this.currentAsyncOperations += 1;
this.asyncOperationsQueue.pop()()
.then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
}
}
add(f){ // the argument f is a function of signature () => Promise
this.runAnother();
return new Promise((resolve, reject) => {
this.asyncOperationsQueue.push(
() => f().then(resolve).catch(reject)
)
})
}
}
//#######################################################
// TESTS
//#######################################################
function dbCall(id, timeout, fail) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (fail) {
reject(`Error for id ${id}`);
} else {
resolve(id);
}
}, timeout)
}
)
}
const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);
const cappedPool = new Pool(2);
const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
这种方法提供了一个很好的API,类似于scala/java.
中的线程池。使用 const cappedPool = new Pool(2)
创建池的一个实例后,您只需cappedPool.add(() => myPromise)
.
即可向其提供承诺不知不觉中,我们必须确保承诺不会立即开始,这就是为什么我们必须在该功能的帮助下"懒惰地提供它"。
最重要的是,请注意,方法add
的结果是一个承诺,它将以您原始承诺的价值完成/解决!这使得使用非常直观。
const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
// Do something with the result form the DB
}
)
此解决方案使用异步生成器来管理带有原版 javascript 的并发承诺。throttle
生成器有 3 个参数:
- 要作为参数提供给 promise 生成函数的值数组。(例如,网址数组。
- 返回承诺的函数。(例如,返回 HTTP 请求的承诺。
- 一个整数,表示允许的最大并发承诺数。
承诺仅根据需要实例化,以减少内存消耗。可以使用 for await 对结果进行迭代...的声明。
下面的示例提供了一个用于检查承诺状态的函数、限制异步生成器,以及一个基于 setTimeout 返回承诺的简单函数。最后的异步IIFE定义超时值的储存库,设置throttle
返回的异步可迭代对象,然后在结果解析时迭代结果。
如果您想要更完整的HTTP请求示例,请在评论中告诉我。
请注意,异步生成器需要 Node.js 16+。
const promiseState = function( promise ) {
const control = Symbol();
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}
const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );
while ( iterable.length > 0 ) {
await Promise.any( iterable );
const pending = [];
const resolved = [];
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}
console.log({ pending, resolved, reservoir });
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
];
yield Promise.allSettled( resolved );
}
}
const getTimeout = delay => new Promise( ( resolve, reject ) => {
setTimeout(resolve, delay, delay);
} );
( async () => {
const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];
const throttledRequests = throttle( test, getTimeout, 4 );
for await ( const timeout of throttledRequests ) {
console.log( timeout );
}
} )();
使用 tiny-async-pool
ES9 进行等待...的 API,您可以执行以下操作:
const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;
for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
console.log(user, count);
}
上面的 asyncPool 函数返回一个异步迭代器,该迭代器在承诺完成(在并发限制下(后立即生成,并在其中一个承诺拒绝后立即拒绝。
下面的concurrent
函数将返回一个 Promise,该 Promise 解析为已解析的承诺值数组,同时实现并发限制。 没有第三方库。
// waits 50 ms then resolves to the passed-in arg
const sleepAndResolve = s => new Promise(rs => setTimeout(()=>rs(s), 50))
// queue 100 promises
const funcs = []
for(let i=0; i<100; i++) funcs.push(()=>sleepAndResolve(i))
//run the promises with a max concurrency of 10
concurrent(10,funcs)
.then(console.log) // prints [0,1,2...,99]
.catch(()=>console.log("there was an error"))
/**
* Run concurrent promises with a maximum concurrency level
* @param concurrency The number of concurrently running promises
* @param funcs An array of functions that return promises
* @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
*/
function concurrent(concurrency, funcs) {
return new Promise((resolve, reject) => {
let index = -1;
const p = [];
for (let i = 0; i < Math.max(1, Math.min(concurrency, funcs.length)); i++)
runPromise();
function runPromise() {
if (++index < funcs.length)
(p[p.length] = funcs[index]()).then(runPromise).catch(reject);
else if (index === funcs.length)
Promise.all(p).then(resolve).catch(reject);
}
});
}
如果您有兴趣,这里是打字稿版本
/**
* Run concurrent promises with a maximum concurrency level
* @param concurrency The number of concurrently running promises
* @param funcs An array of functions that return promises
* @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
*/
function concurrent<V>(concurrency:number, funcs:(()=>Promise<V>)[]):Promise<V[]> {
return new Promise((resolve,reject)=>{
let index = -1;
const p:Promise<V>[] = []
for(let i=0; i<Math.max(1,Math.min(concurrency, funcs.length)); i++) runPromise()
function runPromise() {
if (++index < funcs.length) (p[p.length] = funcs[index]()).then(runPromise).catch(reject)
else if (index === funcs.length) Promise.all(p).then(resolve).catch(reject)
}
})
}
已经有很多答案了,但我最终使用了一个非常简单、不需要库或睡眠的解决方案,它只使用几个命令。Promise.all(( 只是让你知道传递给它的所有承诺何时完成。因此,您可以间歇性地检查队列以查看它是否已准备好进行更多工作,如果是,请添加更多进程。
例如:
// init vars
const batchSize = 5
const calls = []
// loop through data and run processes
for (let [index, data] of [1,2,3].entries()) {
// pile on async processes
calls.push(doSomethingAsyncWithData(data))
// every 5th concurrent call, wait for them to finish before adding more
if (index % batchSize === 0) await Promise.all(calls)
}
// clean up for any data to process left over if smaller than batch size
const allFinishedProcs = await Promise.all(calls)
没有外部库。只是普通的JS。
它可以使用递归来解决。
这个想法是,最初我们立即执行允许的最大查询数,并且这些查询中的每一个都应该在完成时递归地启动一个新查询。
在此示例中,我将成功的响应与错误一起填充,并执行所有查询,但如果要在第一次失败时终止批处理执行,则可以稍微修改算法。
async function batchQuery(queries, limit) {
limit = Math.min(queries.length, limit);
return new Promise((resolve, reject) => {
const responsesOrErrors = new Array(queries.length);
let startedCount = 0;
let finishedCount = 0;
let hasErrors = false;
function recursiveQuery() {
let index = startedCount++;
doQuery(queries[index])
.then(res => {
responsesOrErrors[index] = res;
})
.catch(error => {
responsesOrErrors[index] = error;
hasErrors = true;
})
.finally(() => {
finishedCount++;
if (finishedCount === queries.length) {
hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
} else if (startedCount < queries.length) {
recursiveQuery();
}
});
}
for (let i = 0; i < limit; i++) {
recursiveQuery();
}
});
}
async function doQuery(query) {
console.log(`${query} started`);
const delay = Math.floor(Math.random() * 1500);
return new Promise((resolve, reject) => {
setTimeout(() => {
if (delay <= 1000) {
console.log(`${query} finished successfully`);
resolve(`${query} success`);
} else {
console.log(`${query} finished with error`);
reject(`${query} error`);
}
}, delay);
});
}
const queries = new Array(10).fill('query').map((query, index) => `${query}_${index + 1}`);
batchQuery(queries, 3)
.then(responses => console.log('All successfull', responses))
.catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));
所以我试图使一些显示的示例适用于我的代码,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包批处理承诺对我来说肯定是最简单的途径
注意:需要运行时支持承诺或填充
。应用程序接口batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee(承诺:迭代将在每批之后调用。
用:
batch-promises
Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
Use:
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() => {
resolve(i);
}, 100);
}))
.then(results => {
console.log(results); // [1,2,3,4,5]
});
如果你不想使用外部库,递归就是答案
downloadAll(someArrayWithData){
var self = this;
var tracker = function(next){
return self.someExpensiveRequest(someArrayWithData[next])
.then(function(){
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length){//Did I finish processing all my data?
return tracker(next);//Go to the next promise
}
});
}
return tracker(0);
}
扩展@deceleratedcaviar发布的答案,我创建了一个"批处理"实用程序函数,该函数以参数为:值数组、并发限制和处理函数。是的,我意识到使用 Promise.all 这种方式更类似于批处理而不是真正的并发性,但如果目标是一次限制过多的 HTTP 调用,我会使用这种方法,因为它很简单并且不需要外部库。
async function batch(o) {
let arr = o.arr
let resp = []
while (arr.length) {
let subset = arr.splice(0, o.limit)
let results = await Promise.all(subset.map(o.process))
resp.push(results)
}
return [].concat.apply([], resp)
}
let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }
async function calc(val) { return val * 100 }
(async () => {
let resp = await batch({
arr: arr,
limit: 100,
process: calc
})
console.log(resp)
})();
另一个带有自定义承诺库 (CPromise( 的解决方案:
- 使用生成器 实时代码沙盒演示
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
function* () {
const urls = [
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
];
for (const url of urls) {
yield cpFetch(url); // add a promise to the pool
console.log(`Request [${url}] completed`);
}
},
{ concurrency: 2 }
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
// setTimeout(() => promise.cancel(), 4500);
- 使用映射器实时代码沙盒演示
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
[
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
],
{
mapper: (url) => {
console.log(`Request [${url}]`);
return cpFetch(url);
},
concurrency: 2
}
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
//setTimeout(() => promise.cancel(), 4500);
警告 这还没有针对效率进行基准测试,并且执行了大量的数组复制/创建
如果你想要一个更实用的方法,你可以做这样的事情:
import chunk from 'lodash.chunk';
const maxConcurrency = (max) => (dataArr, promiseFn) =>
chunk(dataArr, max).reduce(
async (agg, batch) => [
...(await agg),
...(await Promise.all(batch.map(promiseFn)))
],
[]
);
然后你可以像这样使用它:
const randomFn = (data) =>
new Promise((res) => setTimeout(
() => res(data + 1),
Math.random() * 1000
));
const result = await maxConcurrency(5)(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
randomFn
);
console.log('result+++', result);
我有创建块并使用 .reduce 函数等待每个块承诺.alls 完成的解决方案。如果承诺有一些调用限制,我还会添加一些延迟。
export function delay(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
export const chunk = <T>(arr: T[], size: number): T[][] => [
...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));
const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items
await groupedIdList.reduce(async (prev, subIdList) => {
await prev;
// Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
const data = await Promise.all(subIdList.map(myPromise));
await delay(500);
}, Promise.resolve());
可以使用 https://www.npmjs.com/package/job-pipe 来限制对服务器的请求
基本上,你创建一个管道并告诉它你想要多少个并发请求:
const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })
然后,你获取执行调用的函数,并强制它通过管道同时创建有限数量的调用:
const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)
最后,您可以根据需要多次调用此方法,就好像它没有更改一样,它将限制自己可以处理的并行执行次数:
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
....
await limitedMakeCall()
利润。
我建议不要下载软件包,也不要编写数百行代码:
async function async_arr<T1, T2>(
arr: T1[],
func: (x: T1) => Promise<T2> | T2, //can be sync or async
limit = 5
) {
let results: T2[] = [];
let workers = [];
let current = Math.min(arr.length, limit);
async function process(i) {
if (i < arr.length) {
results[i] = await Promise.resolve(func(arr[i]));
await process(current++);
}
}
for (let i = 0; i < current; i++) {
workers.push(process(i));
}
await Promise.all(workers);
return results;
}
这是我的食谱,基于killdash9的答案。它允许选择异常的行为(Promise.all
vs Promise.allSettled
(。
// Given an array of async functions, runs them in parallel,
// with at most maxConcurrency simultaneous executions
// Except for that, behaves the same as Promise.all,
// unless allSettled is true, where it behaves as Promise.allSettled
function concurrentRun(maxConcurrency = 10, funcs = [], allSettled = false) {
if (funcs.length <= maxConcurrency) {
const ps = funcs.map(f => f());
return allSettled ? Promise.allSettled(ps) : Promise.all(ps);
}
return new Promise((resolve, reject) => {
let idx = -1;
const ps = new Array(funcs.length);
function nextPromise() {
idx += 1;
if (idx < funcs.length) {
(ps[idx] = funcs[idx]()).then(nextPromise).catch(allSettled ? nextPromise : reject);
} else if (idx === funcs.length) {
(allSettled ? Promise.allSettled(ps) : Promise.all(ps)).then(resolve).catch(reject);
}
}
for (let i = 0; i < maxConcurrency; i += 1) nextPromise();
});
}
<</div>
div class="answers"> 控制承诺/请求的最大数量的一个好解决方案是将请求列表拆分为多个页面,并且一次只生成一个页面的请求。
下面的示例使用了迭代操作库:
import {pipeAsync, map, page} from 'iter-ops';
const i = pipeAsync(
users, // make it asynchronous
page(10), // split into pages of 10 items in each
map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
wait() // resolve each page in the pipeline
);
// below triggers processing page-by-page:
for await(const p of i) {
//=> p = resolved page of data
}
这样,它就不会尝试创建超过一个页面大小的请求/承诺。
在这里的代码中使用Promise.race
所做的
const identifyTransactions = async function() {
let promises = []
let concurrency = 0
for (let tx of this.transactions) {
if (concurrency > 4)
await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
promises.push(tx.identifyTransaction())
concurrency++
}
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
}
如果您想查看示例:https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
- 在localhost Dev Box上测试JSONP请求的最佳方式
- 有条件更新d3.js力图中节点的最佳方法
- 为react组件传递道具的最佳方式
- 与运行长作业(javascript,node.js)的第三方API同步的最佳实践
- 让Webpack管理Quirky AMD定义的最佳方式
- 在承诺链中处理早期回报的最佳方式
- 将jQuery.ech()方法转换为本地JavaScript抽象的最佳方法是什么
- Angularjs 1.5.x本地化最佳实践
- 处理浮点错误的最佳方法是什么
- javascript导入的最佳实践是什么
- MobileFirst:在客户端运行计时器作业-最佳选项
- 在ng重复循环中显示条件内容的最佳方式是什么
- 在phonegap中为android调用onload函数的最佳方式
- 实现比较方法的最佳实践是什么;s的比较类型是在运行时选择的
- 从数组中删除元素的最佳方法是:javascript/jquery
- 链接两个网页或网络应用程序的最佳方式
- 什么's是连接供应商js文件的最佳方式
- Express服务器中语言子域的最佳实践
- 什么's是在javascript中迭代项的最佳方式
- 使用 ES6 的 Promise.all() 时限制并发性的最佳方法是什么?