承诺合并流
Promise combine stream
我想逐行读取大文件,并将数据插入数据库存储中。我的函数在它创建的流中返回一个Promise,并在调用事件stream.on('end')
时解析它,但这不是我真正想要的,因为在stream.on('data')
中,它在每一行上都生成Promise.map()
,我想确保所有插入操作都在调用resolve()
之前完成。在这种情况下,我如何生产正确的链条?
var loadFromFile = (options) => new Promise(function (resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
stream.on('data', (chunk) => {
/* process data chunk to string Array */
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err)=>{
reject(err);
});
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
});
});
如果您想确保在映射操作中的所有promise都得到解决之前不会进行解决,请等待Promise.map
返回的promise;参见评论:
var loadFromFile = (options) => new Promise(function(resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
let promises = []; // Array of promises we'll wait for
stream.on('data', (chunk) => {
promises.push( // Remember this promise
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err) => {
reject(err);
})
);
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
});
});
请注意,我不仅仅执行Promise.all(promises).then(resolve);
,因为这将向resolve
传递一组解析,而您的原始代码没有与调用者共享这些解析。
如果阵列在充满大多数已经解决的承诺时可能会变得很大,那么你可以在它们解决时主动删除它们,只等待最后剩下的承诺。
如果您需要使用管道方法中的暂停/恢复自动机制来处理文件,请查看超燃冲压发动机。
您的示例看起来会更加清晰,请参阅:
var loadFromFile = (options) =>
fs.createReadStream(options.filePath, {
flags: 'r',
})
.split(/'r?'n/)
// split by line by regex
.parse((line) => aPromiseReturningFunction(line))
// will wait until promises are resolved
.map((data) => doSomeAsyncAndReturnPromise(data))
// will wait as above
.accumulate((acc, resolved) => acc.push(resolved), [])
.then((arr) => {
if (someBuisnessLogicCheckHere) {
return Promise.reject('Invalid data was found in file');
}
// Here all your items are saved (i.e. all Promises resolved)
});
});
相关文章:
- JQuery合并了keyup和focusout两个函数
- 我的职位回报太快了,如何做出承诺
- 打破承诺链的好方法是什么
- 从函数返回角度承诺
- JSON重构(合并内容)与javascript
- 我怎样才能获得承诺的价值
- 延期承诺值未更新/解析/延期
- Javascript-根据赋值顺序,按键合并对象数组
- 在承诺链中处理早期回报的最佳方式
- 合并两个数组,重新调整循环js
- 承诺在非节点式回调上使用Bluebird
- 如何合并不同集合的游标并按日期排序
- 合并TinyMCE(jQuery插件)文件
- 简单的ES6承诺问题-交换解决和拒绝参数
- 组合承诺和非承诺值
- 带有对象/原型的链式承诺(Q延期)
- AngularJS$q承诺使用socket.io
- React JS:未捕获(在承诺中)语法错误:在位置 0 的 JSON 中意外<令牌
- 承诺合并流
- 进行多个 http 调用并使用承诺合并结果