如何在node.js可读流中调用异步函数

How to call an asynchronous function inside a node.js readable stream

本文关键字:调用 异步 函数 node js      更新时间:2023-09-26

这是一个实现自定义可读流的简短示例。该类名为MyStream。流从目录中获取文件/文件夹名,并将值推送到数据事件。

为了进行比较,我(在本例中)实现了两种不同的方式/功能。一个是同步的,另一个是异步的。构造函数的第二个参数让您决定使用哪种方式(true用于异步,false用于同步。

readcounter统计方法_read被调用的次数。只是为了提供反馈。

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);
function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}
MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:''Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

同步方式的工作方式与预期的一样,但当我异步调用它时,会发生一些有趣的事情。每次通过that.push(files[i])推送文件名时,都会再次调用_read方法。当第一个异步循环结束并且that.push(null)定义流的末尾时,这会导致错误。

我用来测试的环境是:节点4.1.1,Electron 0.35.2。

我不明白为什么_read被称为n,以及为什么会发生这种情况。也许是虫子?或者有什么东西我现在看不见。有没有一种方法可以通过使用异步函数来构建可读流?异步地推送块将非常酷,因为这将是一种非阻塞流的方式。特别是当你有大量数据时。

_read在"读取器"需要数据时调用,通常在推送数据后调用。

我在直接实现_read时也遇到了同样的"问题",所以现在,我编写了一个返回流对象的函数。它工作得很好,数据不能从我的流中"提取",当我决定时,数据是可用的/推送的。举你的例子,我会这样做:

var Readable = require('stream').Readable;
var fs = require('fs');
function MyStream(dirpath, async, opt) {
  var rs = new Readable();
  // needed to avoid "Not implemented" exception
  rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder);
  };
  var counter = 0;
  var readcounter = 0;
  if (async) {
    console.log("Readcounter: " + readcounter);
    fs.readdir(dirpath, function (err, files) {
      counter++;
      console.log("Counter: " + counter);
      for (var i = 0; i < files.length; i++) {
        rs.push(files[i]);
      }
      rs.push(null);
    });
  } else {
    console.log("Readcounter: " + readcounter);
    files = fs.readdirSync(dirpath)
    for (var i = 0; i < files.length; i++) {
      rs.push(files[i]);
    };
    rs.push(null);
  }
  return rs;
}
var mystream = MyStream('C:''Users', true);
mystream.on('data', function (chunk) {
  console.log(chunk.toString());
});

它不会直接回答你的问题,但它是一种获得工作代码的方法。

自节点10以来已修复

https://github.com/nodejs/node/issues/3203

如果我的理解是正确的,那么在Node 10之前,异步_read()实现必须使用数据只调用this.push()一次,并创建自己的缓冲区,以便将后续this.push()延迟到下一个_read()调用。

const {Readable} = require('stream');
let i = 0;
const content_length = 5;
let content_read = 0;
const stream = new Readable({
  encoding: 'utf8',
  read() {
    console.log('read', ++i);
    const icopy = i;
    setTimeout(() => {
      for (let a=1; a<=3; a++) {
        this.push(icopy+':'+a);
      }
      content_read++;
      if (content_read == content_length) {
        console.log('close');
        this.push(null);
      }
    }, Math.floor(Math.random()*1000));
  },
});
stream.on('data', (data) => {
  console.log(data);
});

节点8.17.0:

read 1
1:1
read 2
1:2
read 3
1:3
read 4
2:1
read 5
2:2
read 6
2:3
read 7
6:1
read 8
6:2
read 9
6:3
read 10
9:1
read 11
9:2
read 12
9:3
read 13
12:1
read 14
12:2
read 15
12:3
read 16
close
events.js:183
      throw er; // Unhandled 'error' event
      ^
Error: stream.push() after EOF

节点10.24.1:

read 1
1:1
1:2
1:3
read 2
2:1
2:2
2:3
read 3
3:1
3:2
3:3
read 4
4:1
4:2
4:3
read 5
5:1
5:2
5:3
close