“没有光纤运行”,同时尝试逐行解析 csv
"No fiber running" while trying to parse a csv line by line
我试图了解如何在fast-csv中使用Fiber来制作逐行读取器(单用户命令行脚本),该读取器在每一行暂停读取/处理,直到该行完成各种异步调用。 (在不滚动我自己的 csv 代码的情况下,我想使用已经弄清楚 csv 格式问题的东西)
如果我这样做
var csv = require("fast-csv");
var CSV_STRING = 'a,b'n' +
'a1,b1'n' +
'a2,b2'n';
csv
.fromString(CSV_STRING, {headers: false})
.on("record", function (data) {
console.log("line="+JSON.stringify(data));
setTimeout(function(){
console.log("timeout");
},2000);
})
.on("end", function () {
console.log("done parsing CSV records");
});
console.log("done initializing csv parse");
我得到了我所期望的:
done initializing csv parse
line=["a","b"]
line=["a1","b1"]
line=["a2","b2"]
done parsing CSV records
timeout
timeout
timeout
如果我尝试在每条记录后使用光纤来产生
Fiber(
function () {
var fiber = Fiber.current;
csv
.fromString(CSV_STRING, {headers: false})
.on("record", function (data) {
console.log("line="+JSON.stringify(data));
setTimeout(function(){
console.log("timeout");
fiber.run();
},2000);
Fiber.yield();
})
.on("end", function () {
console.log("done parsing CSV records");
});
console.log("done initializing csv parse");
}).run();
我得到
done initializing csv parse
line=["a","b"]
events.js:141
throw er; // Unhandled 'error' event
^
Error: yield() called with no fiber running
我想我明白发生了什么,Fiber().run() 中的代码完成,所以它在调用产量之前离开了光纤,所以当它达到产量时,就不再有纤维了。(因此出现了巧妙的错误消息"没有光纤运行")
在完成解析之前,我保持光纤运行的合适方法是什么?
似乎是一个如此简单的问题,但我没有看到明显的答案? 起初我想在它离开 Future().run() 之前放一个产量,但这不起作用,因为第一个 fiber.run() 会让它再次离开纤维。
我想要的是流程是这样的:
done initializing csv parse
line=["a","b"]
timeout
line=["a1","b1"]
timeout
line=["a2","b2"]
timeout
done parsing CSV records
但是,如果不重新设计 Fast-CSV 的内部,这也许是不可能的,因为它控制着每条记录的事件触发时间。 我目前的想法是,在 fast-csv 中触发每个事件时,必须让出,并让用户在 csv.on("record") 中处理事件,将控制权交还给在 fast-csv 中解析 csv 的循环。
节点:v5.4.0
好吧,这是获得这种行为的一种方法。 我使用 es6 生成器逐行读取原始文件,然后在 fast-csv 库上使用生成器从逐行读取中解析原始字符串,这导致非异步执行流和输出类似于旧的单用户命令行脚本。
'use strict';
var csv = require("fast-csv");
var sfs = require('./sfs');
function parse(line) {
csv
.fromString(line, {headers: false})
.on("record", function (data) {
it.next(data);
});
}
function *main() {
// Make sure to initialize with a max buffer big enough to span any possible line length. Otherwise undefined
var fs = new sfs(it, 4096);
var result=yield fs.open("data.csv");
var line;
while((line=yield fs.readLine()) != null) {
console.log("line="+line);
var csvData=yield parse(line);
console.log("value1="+csvData[0]+" value2="+csvData[1]);
}
console.log("DONE");
}
var it = main();
it.next(); // get it all started
以及一个庸医(快速和笨拙)的类来包装我需要的 fs 东西。 我相信有一种更好的方法来做我所做的事情,但它适合我的需求。
SFS.js
'use strict';
var fs=require('fs')
class sfs {
constructor(it, maxbufsize) {
this.MAX_BUF=maxbufsize;
this.it=it;
this.fd=null;
this.lineBuf="";
this.buffer=new Buffer(this.MAX_BUF);
this.buflen=0;
}
open(file) {
var parent=this;
fs.open(file,'r',function(err,fd){
parent.fd=fd;
var parent2=parent;
fs.fstat(fd,function(err, stats){
parent2.stats=stats;
parent2.it.next(stats);
})
})
}
readLine(){
var parent = this;
var i=0
var s=this.stats.size
var line="";
var index=this.MAX_BUF-this.buflen;
// read data into buffer, buffer may already have data from previous read that was shifted left over extracted line
fs.read(this.fd,this.buffer,this.MAX_BUF-index,index,null,function(err,len,buf){
var expectedReadLen=parent.MAX_BUF-parent.buflen;
if(len < expectedReadLen) { // If we didn't read enough to backfill buffer, lets make sure the string is terminated
// as it shifts left so we don't try interpret older records to the right
parent.buffer.fill(' ',parent.buflen+len,parent.MAX_BUF);
}
parent.buflen+=len; // whatever was in buffer has more now
index=parent.buffer.indexOf(''n');
if(index > -1) {
line=parent.buffer.toString('utf8',0,index);
buf.copy(parent.buffer,0,index+1,parent.buflen); // shift unused data left
parent.buflen-=(index+1); // buffer left over after removing /n terminated line
if(len<expectedReadLen) { // If we didn't read enough to backfill buffer, lets make sure we erase old data
parent.buffer.fill(' ',parent.buflen,parent.MAX_BUF);
}
} else {
if(parent.buflen > 0) {
line=parent.buffer.toString('utf8',0,parent.buflen);
parent.buflen=0;
} else {
line=null;
}
}
parent.it.next(line);
});
}
close() {
fs.close(this.fd);
}
}
module.exports=sfs;
流是可暂停/可恢复的:
var csv = require("fast-csv");
var CSV_STRING = 'a,b'n' +
'a1,b1'n' +
'a2,b2'n';
var stream = csv.fromString(CSV_STRING, { headers: false })
.on("data", function (data) {
// pause the stream
stream.pause();
console.log("line: " + JSON.stringify(data));
setTimeout(function () {
// all async stuff are done, resume the stream
stream.resume();
console.log("timeout");
}, 2000);
}).on("end", function () {
console.log("done parsing CSV records");
});
控制台输出几乎正是您想要的:
/*
line: ["a","b"]
timeout
line: ["a1","b1"]
timeout
line: ["a2","b2"]
done parsing CSV records
timeout
*/
我能问一下为什么你绝对需要同步阅读你的csv吗?
- NodeJS-readline暂停和恢复事件发射器(逐行读取)
- 使用Javascript从一个文本区域逐行解析.subsr(0,6)到另一个文本区
- 从 Node.js 模块中的缓冲区实例中逐行读取字符串
- 读取制表符分隔的文件,逐行读取,而不是使用制表符分隔拆分每一行
- 节点画布:将PNG文件输出为交错/逐行扫描
- 如何在Web SQL数据库中逐行插入文本文件
- 如何在“逐行”npm 中遍历所有行
- 动画图表js(线),动画图表逐行绘制
- NodeJS - 逐行读取函数 + while 循环
- 如果可能的话,我需要帮助了解这个jQuery过滤器函数是如何工作的,逐行工作
- 节点.JS逐行读取字符串
- 逐行缩进 Javascript 文件
- 逐行阅读文本区域中的文本
- “没有光纤运行”,同时尝试逐行解析 csv
- D3 - 以设置的延迟逐行加载和显示数据
- j查询到特定元素后按类逐行目标表
- 如何使用谷歌应用脚本和谷歌工作表数据逐行发送html电子邮件
- 使用 javaScript 逐行读取文件
- NodeJS 流解析并根据承诺结果逐行写入 json
- 使用 node.js 和 ES6/ES7 功能逐行读取 CSV 文件