承诺合并流

Promise combine stream

本文关键字:合并 承诺      更新时间:2023-09-26

我想逐行读取大文件,并将数据插入数据库存储中。我的函数在它创建的流中返回一个Promise,并在调用事件stream.on('end')时解析它,但这不是我真正想要的,因为在stream.on('data')中,它在每一行上都生成Promise.map(),我想确保所有插入操作都在调用resolve()之前完成。在这种情况下,我如何生产正确的链条?

    var loadFromFile = (options) => new Promise(function (resolve, reject) {
        let stream = fs.createReadStream(options.filePath, {
          flags: 'r',
        });
        stream.on('data', (chunk) => {
            /* process data chunk to string Array */  
            Promise.map(lines, (line) => {
            /* process and insert each line here */
            })
            .catch((err)=>{
                reject(err);
            });
          });
          stream.on('end', () => {
              if (someBuisnessLogicCheckHere) {
                reject('Invalid data was found in file');
              }
              resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
          });
    });

如果您想确保在映射操作中的所有promise都得到解决之前不会进行解决,请等待Promise.map返回的promise;参见评论:

var loadFromFile = (options) => new Promise(function(resolve, reject) {
    let stream = fs.createReadStream(options.filePath, {
        flags: 'r',
    });
    let promises = [];                                 // Array of promises we'll wait for
    stream.on('data', (chunk) => {
        promises.push(                                 // Remember this promise
            Promise.map(lines, (line) => {
                /* process and insert each line here */
            })
            .catch((err) => {
                reject(err);
            })
        );
    });
    stream.on('end', () => {
        if (someBuisnessLogicCheckHere) {
            reject('Invalid data was found in file');
        }
        Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
    });
});

请注意,我不仅仅执行Promise.all(promises).then(resolve);,因为这将向resolve传递一组解析,而您的原始代码没有与调用者共享这些解析。

如果阵列在充满大多数已经解决的承诺时可能会变得很大,那么你可以在它们解决时主动删除它们,只等待最后剩下的承诺。

如果您需要使用管道方法中的暂停/恢复自动机制来处理文件,请查看超燃冲压发动机。

您的示例看起来会更加清晰,请参阅:

var loadFromFile = (options) => 
    fs.createReadStream(options.filePath, {
        flags: 'r',
    })
    .split(/'r?'n/) 
         // split by line by regex
    .parse((line) => aPromiseReturningFunction(line))
         // will wait until promises are resolved
    .map((data) => doSomeAsyncAndReturnPromise(data))      
         // will wait as above
    .accumulate((acc, resolved) => acc.push(resolved), [])
    .then((arr) => {
         if (someBuisnessLogicCheckHere) {
              return Promise.reject('Invalid data was found in file');
         }
         // Here all your items are saved (i.e. all Promises resolved)
     });
});