createReadStream在数据完成处理之前结束激发

createReadStream end fires before data finished processing

本文关键字:结束 处理 数据 createReadStream      更新时间:2023-10-28

我正在尝试执行以下操作:

  1. 逐行流式传输csv文件
  2. 修改每行中包含的数据
  3. 一旦所有行都被流式传输和处理,就完成并继续下一个任务

问题是.on("end").on("data")完成对每一行的处理之前激发。在.on("data")处理完所有行之后,我如何让.on("end")启动?

下面是我所说的一个简单的例子:

import parse from 'csv-parse'; 
var parser = parse({});
fs.createReadStream(this.upload.location)
.pipe(parser)
.on("data", line => {
  var num = Math.floor((Math.random() * 100) + 1);
  num = num % 3;
  num = num * 1000;
  setTimeout( () => { 
    console.log('data process complete');
  }, num);
})
.on("end", () => {
   console.log('Done: parseFile');
   next(null);
});

提前谢谢。

我认为问题在于data事件侦听器中的setTimeout(或任何其他异步任务)。enddata之后激发,但异步任务导致它记录消息,即使在流激发end之后也是如此。

如果你取出setTimeout,你会看到它记录了end之前的所有消息。您仍然可以执行异步任务,但可能会有一批异步任务在流结束后运行。

此代码有助于解释发生了什么:

const fs = require('fs')
const testFileName = 'testfile.txt'
fs.writeFileSync(testFileName, '123456789')
let count = 0
const readStream = fs.createReadStream(testFileName, {
  encoding: 'utf8',
  highWaterMark: 1  // low highWaterMark so we can have more chunks to observe
})
readStream.on('data', (data) => {
  console.log('+++++++++++processing sync+++++++++++++')
  console.log(data)
  console.log('+++++++++++end processing sync+++++++++++++')
  setTimeout(() => {
    console.log('-----------processing async-------------')
    console.log(data)
    console.log('-----------end processing async-------------')
  }, ++count * 1000)
})
readStream.on('end', () => {
  console.log('stream ended but still have async tasks doing their thing')
  fs.unlinkSync(testFileName)
})