如何编写 Node.js 模块来处理传入的管道流

How do I write a Node.js module to handle an incoming piped stream

本文关键字:管道 处理 何编写 Node js 模块      更新时间:2023-09-26

我正在尝试编写一个接受传入管道二进制(或base-64编码)流的节点模块,但坦率地说,我什至不知道从哪里开始。我在 Node 文档中看不到任何关于处理传入流的示例;我只看到消费它们的例子?

例如,假设我希望能够做到这一点:

var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')
var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset)
stream.on('finish', function() {
    done()
})

ProjectAsset看起来像这样,但我不知道下一步该去哪里:

'use strict'
var stream = require('stream'),
    util = require('util')
var ProjectAsset = function() {
    var self = this
    Object.defineProperty(self, 'binaryData', {
        configurable: true,
        writable: true
    })
    stream.Stream.call(self)
    self.on('pipe', function(src) {
        // does it happen here? how do I set self.binaryData?
    })
    return self
}
util.inherits(ProjectAsset, stream.Stream)
module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

可以从stream.Stream继承并使其工作,但是根据文档中提供的内容,我建议从stream.Writable继承。管道进入stream.Writable您需要定义_write(chunk, encoding, done)来处理管道。下面是一个示例:

var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')
var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset)
stream.on('finish', function() {
    console.log(asset.binaryData);
})

项目资产

'use strict'
var stream = require('stream'),
    util = require('util')
var ProjectAsset = function() {
    var self = this
    self.data
    self.binaryData = [];
    stream.Writable.call(self)
    self._write = function(chunk, encoding, done) {
        // Can handle this data however you want
        self.binaryData.push(chunk.toString())
        // Call after processing data
        done()
    }
    self.on('finish', function() {
        self.data = Buffer.concat(self.binaryData)
    })
    return self
}
util.inherits(ProjectAsset, stream.Writable)
module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

如果您还希望从stream中读取,请查看从stream.Duplex继承以及包括_read(size)方法。

如果你正在做一些更简单的事情,还有简化的构造函数 api。

我不确定这是否是您要查找的,但我认为您可以使用缓冲区 Buffer.concat API 来处理它,并在可以在流data侦听器上从chunk检索的缓冲区数组

'use strict'
var stream = require('stream'),
    util = require('util');
var ProjectAsset = function() {
    var self = this
    Object.defineProperty(self, 'binaryData', {
        configurable: true,
        writable: true
    })
    stream.Stream.call(self)
    var data;
    var dataBuffer=[];
    self.on('data', function(chunk) {
        dataBuffer.push(chunk);
    }).on('end',function(){
        data=Buffer.concat(dataBuffer);
    });
    self.binaryData=data.toString('binary');
    return self
}
util.inherits(ProjectAsset, stream.Stream)
module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

既然你使用var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')我想你的项目资产职责是采取一些输入流做一些转换并将其写入文件。您可以实现转换流,因为您从流接收一些输入,并生成一些可以保存到文件或其他写入流的输出。

当然,您可以通过从节点继承来实现转换流.js但继承非常麻烦,所以我的实现使用 through2 来实现转换流:

module.exports = through2(function (chunk, enc, callback) {
  // This function is called whenever a piece of data from the incoming stream is read
  // Transform the chunk or buffer the chunk in case you need more data to transform
  // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read
  this.push(chunk);
  // Signal through2 that you processed the incoming data package
  callback();
 }))

用法

var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' })
               .pipe(projectAsset)
               .pipe(fs.createWriteStream(__dirname + '/image.jpg'));

如此示例所示,实现流管道完全分离数据转换和数据保存。

工厂功能

如果您更喜欢在项目资产模块中使用类似构造函数的方法,因为您需要传递一些值或内容,则可以轻松导出构造函数,如下所示

var through2 = require('through2');
module.exports = function(someData) {
  // New stream is returned that can use someData argument for doing things
  return through2(function (chunk, enc, callback) {
    // This function is called whenever a piece of data from the incoming stream is read
    // Transform the chunk or buffer the chunk in case you need more data to transform
    // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read
    this.push(chunk);
    // Signal through2 that you processed the incoming data package
    callback();
  });
}

用法

var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' })
               .pipe(projectAsset({ foo: 'bar' }))
               .pipe(fs.createWriteStream(__dirname + '/image.jpg'));