实现缓冲转换流

Implementing a buffered transform stream

本文关键字:转换 缓冲 实现      更新时间:2024-01-03

我正在尝试用新的Node.js流API实现一个流,该流将缓冲一定量的数据。当这个流通过管道传输到另一个流时,或者如果某个东西消耗了readable事件,这个流应该刷新它的缓冲区,然后简单地变成直通。问题是,这个流将通过管道连接到许多其他流,当每个目标流都被连接时,缓冲区必须被刷新,即使它已经被刷新到另一个流

例如:

  1. BufferStream实现stream.Transform,并保留512KB的内部环形缓冲区
  2. ReadableStreamA通过管道连接到BufferStream的实例
  3. BufferStream写入其环形缓冲区,在数据进入时从ReadableStreamA读取数据。(数据是否丢失并不重要,因为缓冲区会覆盖旧数据。)
  4. BufferStream通过管道连接至WritableStreamB
  5. WritableStreamB接收整个512KB的缓冲区,并在从ReadableStreamABufferStream写入数据时继续获取数据
  6. BufferStream通过管道连接至WritableStreamC
  7. WritableStreamC也接收整个512KB的缓冲区,但这个缓冲区现在与WritableStreamB接收的不同,因为此后有更多的数据被写入BufferStream

对于API流,这可能吗?我能想到的唯一方法是创建一个对象,该对象的方法为每个目的地旋转一个新的PassThrough流,这意味着我不能简单地通过管道往返于它。

值得一提的是,我通过简单地监听data事件上的新处理程序,使用旧的"流动"API完成了这项工作。当.on('data')附加了一个新函数时,我会用环形缓冲区的副本直接调用它。

以下是我对您的问题的看法。

基本思想是创建一个Transform流,这将允许我们在流的输出上发送数据之前执行您的自定义缓冲逻辑:

var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)
  this.push(chunk)
  done()
}

然后,我们需要覆盖.pipe()方法,以便在BufferStream通过管道传输到流中时通知我们,这允许我们自动向其写入数据:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

通过这种方式,当我们写入buffer.pipe(someStream)时,我们按照预期执行管道,并将内部缓冲区写入输出流。在那之后,Transform类处理所有事情,同时跟踪背压等等。

以下是工作要点。请注意,我没有麻烦编写正确的缓冲逻辑(即,我不关心内部缓冲区的大小),但这应该很容易修复。

Paul的回答很好,但我认为它不符合确切的要求。听起来需要做的是,每次对该转换流调用pipe()时,它都需要首先刷新缓冲区,该缓冲区表示从创建/(连接到源流)到连接到当前可写/目标流之间的所有数据累积。

像这样的东西可能更正确:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };
    util.inherits(BufferStream, stream.Transform);
    BufferStream.prototype._transform = function (chunk, encoding, done) {
        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);
        done()
    };
    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };

    return new BufferStream();

我想是这样的:

BufferStream.super_.prototype.pipe.apply(this, arguments);

相当于:

stream.Transform.prototype.pipe.apply(this, arguments);

您可能会对此进行优化,并在调用pipe/unpipe时使用一些标志。