“没有光纤运行”,同时尝试逐行解析 csv

"No fiber running" while trying to parse a csv line by line

本文关键字:逐行 csv 光纤 运行      更新时间:2023-09-26

我试图了解如何在fast-csv中使用Fiber来制作逐行读取器(单用户命令行脚本),该读取器在每一行暂停读取/处理,直到该行完成各种异步调用。 (在不滚动我自己的 csv 代码的情况下,我想使用已经弄清楚 csv 格式问题的东西)

如果我这样做

var csv = require("fast-csv");
var CSV_STRING = 'a,b'n' +
'a1,b1'n' +
'a2,b2'n';
csv
.fromString(CSV_STRING, {headers: false})
.on("record", function (data) {
    console.log("line="+JSON.stringify(data));
    setTimeout(function(){
        console.log("timeout");
    },2000);
})
.on("end", function () {
    console.log("done parsing CSV records");
});
console.log("done initializing csv parse");

得到了我所期望的:

done initializing csv parse
line=["a","b"]
line=["a1","b1"]
line=["a2","b2"]
done parsing CSV records
timeout
timeout
timeout

如果我尝试在每条记录后使用光纤来产生

Fiber(
    function () {
        var fiber = Fiber.current;
        csv
            .fromString(CSV_STRING, {headers: false})
            .on("record", function (data) {
                console.log("line="+JSON.stringify(data));
                setTimeout(function(){
                    console.log("timeout");
                    fiber.run();
                },2000);
                Fiber.yield();
            })
            .on("end", function () {
                console.log("done parsing CSV records");
            });
        console.log("done initializing csv parse");
    }).run();

我得到

done initializing csv parse
line=["a","b"]
events.js:141
      throw er; // Unhandled 'error' event
      ^
Error: yield() called with no fiber running

我想我明白发生了什么,Fiber().run() 中的代码完成,所以它在调用产量之前离开了光纤,所以当它达到产量时,就不再有纤维了。(因此出现了巧妙的错误消息"没有光纤运行")

在完成解析之前,我保持光纤运行的合适方法是什么?

似乎是一个如此简单的问题,但我没有看到明显的答案? 起初我想在它离开 Future().run() 之前放一个产量,但这不起作用,因为第一个 fiber.run() 会让它再次离开纤维。

我想要的是流程是这样的:

done initializing csv parse
line=["a","b"]
timeout
line=["a1","b1"]
timeout
line=["a2","b2"]
timeout
done parsing CSV records

但是,如果不重新设计 Fast-CSV 的内部,这也许是不可能的,因为它控制着每条记录的事件触发时间。 我目前的想法是,在 fast-csv 中触发每个事件时,必须让出,并让用户在 csv.on("record") 中处理事件,将控制权交还给在 fast-csv 中解析 csv 的循环。

节点:v5.4.0

好吧,这是获得这种行为的一种方法。 我使用 es6 生成器逐行读取原始文件,然后在 fast-csv 库上使用生成器从逐行读取中解析原始字符串,这导致非异步执行流和输出类似于旧的单用户命令行脚本。

'use strict';
var csv = require("fast-csv");
var sfs = require('./sfs');
function parse(line) {
    csv
        .fromString(line, {headers: false})
        .on("record", function (data) {
            it.next(data);
        });
}
function *main() {
    // Make sure to initialize with a max buffer big enough to span any possible line length.  Otherwise undefined
    var fs = new sfs(it, 4096);
    var result=yield fs.open("data.csv");
    var line;
    while((line=yield fs.readLine()) != null) {
        console.log("line="+line);
        var csvData=yield parse(line);
        console.log("value1="+csvData[0]+" value2="+csvData[1]);
    }
    console.log("DONE");
}
var it = main();
it.next(); // get it all started

以及一个庸医(快速和笨拙)的类来包装我需要的 fs 东西。 我相信有一种更好的方法来做我所做的事情,但它适合我的需求。

SFS.js

'use strict';
var fs=require('fs')
class sfs {
    constructor(it, maxbufsize) {
        this.MAX_BUF=maxbufsize;
        this.it=it;
        this.fd=null;
        this.lineBuf="";
        this.buffer=new Buffer(this.MAX_BUF);
        this.buflen=0;
    }
    open(file) {
        var parent=this;
        fs.open(file,'r',function(err,fd){
            parent.fd=fd;
            var parent2=parent;
            fs.fstat(fd,function(err, stats){
                parent2.stats=stats;
                parent2.it.next(stats);
            })
        })
    }
    readLine(){
        var parent = this;
        var i=0
        var s=this.stats.size
        var line="";
        var index=this.MAX_BUF-this.buflen;
        // read data into buffer, buffer may already have data from previous read that was shifted left over extracted line
        fs.read(this.fd,this.buffer,this.MAX_BUF-index,index,null,function(err,len,buf){
            var expectedReadLen=parent.MAX_BUF-parent.buflen;
            if(len < expectedReadLen) {  // If we didn't read enough to backfill buffer, lets make sure the string is terminated
                // as it shifts left so we don't try interpret older records to the right
                parent.buffer.fill(' ',parent.buflen+len,parent.MAX_BUF);
            }
            parent.buflen+=len; // whatever was in buffer has more now
            index=parent.buffer.indexOf(''n');
            if(index > -1) {
                line=parent.buffer.toString('utf8',0,index);
                buf.copy(parent.buffer,0,index+1,parent.buflen); // shift unused data left
                parent.buflen-=(index+1); // buffer left over after removing /n terminated line
                if(len<expectedReadLen) {  // If we didn't read enough to backfill buffer, lets make sure we erase old data
                    parent.buffer.fill(' ',parent.buflen,parent.MAX_BUF);
                }
            } else {
                if(parent.buflen > 0) {
                    line=parent.buffer.toString('utf8',0,parent.buflen);
                    parent.buflen=0;
                } else {
                    line=null;
                }
            }
            parent.it.next(line);
        });
    }
    close() {
        fs.close(this.fd);
    }
}
module.exports=sfs;

流是可暂停/可恢复的:

var csv = require("fast-csv");
var CSV_STRING = 'a,b'n' +
    'a1,b1'n' +
    'a2,b2'n';
var stream = csv.fromString(CSV_STRING, { headers: false })
    .on("data", function (data) {
        // pause the stream
        stream.pause();
        console.log("line: " + JSON.stringify(data));
        setTimeout(function () {
            // all async stuff are done, resume the stream
            stream.resume();
            console.log("timeout");
        }, 2000);
    }).on("end", function () {
        console.log("done parsing CSV records");
    });

控制台输出几乎正是您想要的:

/*
line: ["a","b"]
timeout
line: ["a1","b1"]
timeout
line: ["a2","b2"]
done parsing CSV records
timeout
*/

我能问一下为什么你绝对需要同步阅读你的csv吗?