NodeJS 流解析并根据承诺结果逐行写入 json

NodeJS stream parse and write json line to line upon Promise result

本文关键字:逐行 结果 json 承诺 NodeJS      更新时间:2023-09-26

我有一个看起来像这样的大json文件:

[
 {"name": "item1"},
 {"name": "item2"},
 {"name": "item3"}
]

我想流式传输此文件(到目前为止非常简单),对于每一行,在解析/拒绝调用编辑此行时运行一个异步函数(返回一个承诺)。

输入文件的结果可能是:

[
 {"name": "item1", "response": 200},
 {"name": "item2", "response": 404},
 {"name": "item3"} // not processed yet
]

我不想创建另一个文件,我想即时编辑相同的文件(如果可能的话!

谢谢:)

我并没有真正回答这个问题,但无论如何都不要认为它能以令人满意的方式回答,所以这是我的 2 美分。

我假设您知道如何逐行流式传输并运行该功能,并且您遇到的唯一问题是编辑您正在读取的文件。

插入的后果

无法

将数据本机插入到任何文件中(这是您希望通过更改 JSON 实时执行的操作)。文件只能在其末尾长大。

因此,在 1GB 文件的开头插入 10 字节的数据意味着您需要将 1GB 写入磁盘(将所有数据进一步移动 10 个字节)。

您的文件系统不理解 JSON,只是看到您在大文件的中间插入字节,所以这会非常慢。

所以,是的,这是可能的。使用 insert() 方法在 NodeJS 中的文件 API 上编写包装器。

然后编写更多代码,以便能够知道将字节插入 JSON 文件的位置,而无需加载整个文件,也不会在最后生成无效的 JSON。

现在我不会推荐它:)

=> 阅读这个问题:是否可以在不重写的情况下将数据附加到文件中?

那为什么要这样做呢?

我假设想要要么

  • 能够随时终止您的进程,并通过再次读取文件轻松恢复工作。
  • 重试部分处理的文件以仅填充丢失的位。

第一种解决方案:使用数据库

抽象出在随机位置实时编辑文件需要完成的工作是数据库存在的唯一目的。

它们的存在只是为了抽象UPDATE mytable SET name = 'a_longer_name_that_the_name_that_was_there_before' where name = 'short_name'背后的魔力。

看看 LevelUP/Down、sqlite 等...

他们将抽象出需要在JSON文件中完成的所有魔术!

第二种解决方案:使用多个文件

流式传输文件时,请写入两个新文件!

  • 包含输入文件中的当前位置和需要重试的行
  • 另一个是预期的结果。

您还可以随时终止进程并重新启动

根据这个答案,在读取时写入同一个文件是不可靠的。正如那里的评论者所说,最好写入临时文件,然后删除原始文件并在其上重命名临时文件。

要创建行流,您可以使用署名。然后,对于每一行,应用一些操作并将其管道传输到输出文件。

像这样:

var fs = require('fs');
var stream = require('stream');
var util = require('util');
var LineStream = require('byline').LineStream;
function Modify(options) {
    stream.Transform.call(this, options);
}
util.inherits(Modify, stream.Transform);
Modify.prototype._transform = function(chunk, encoding, done) {
    var self = this;
    setTimeout(function() {
        // your modifications here, note that the exact regex depends on 
        // your json format and is probably the most brittle part of this
        var modifiedChunk = chunk.toString();
        if (modifiedChunk.search('response:[^,}]+') === -1) {
            modifiedChunk = modifiedChunk
                .replace('}', ', response: ' + new Date().getTime() + '}') + ''n';
        }      
        self.push(modifiedChunk);
        done();
    }, Math.random() * 2000 + 1000); // to simulate an async modification
};
var inPath = './data.json';
var outPath = './out.txt';
fs.createReadStream(inPath)
    .pipe(new LineStream())
    .pipe(new Modify())
    .pipe(fs.createWriteStream(outPath))
    .on('close', function() {
        // replace input with output
        fs.unlink(inPath, function() {
           fs.rename(outPath, inPath);
        });
    });

请注意,上述情况一次只发生一个异步操作。您还可以将修改保存到数组中,并在完成所有修改后将数组中的行写入文件,如下所示:

var fs = require('fs');
var stream = require('stream');
var LineStream = require('byline').LineStream;
var modifiedLines = [];
var modifiedCount = 0;
var inPath = './data.json';
var allModified = new Promise(function(resolve, reject) {
    fs.createReadStream(inPath).pipe(new LineStream()).on('data', function(chunk) {
       modifiedLines.length++;
       var index = modifiedLines.length - 1;
       setTimeout(function() {
           // your modifications here
           var modifiedChunk = chunk.toString();
           if (modifiedChunk.search('response:[^,}]+') === -1) {
               modifiedChunk = modifiedChunk
                   .replace('}', ', response: ' + new Date().getTime() + '}');
           }                      
           modifiedLines[index] = modifiedChunk;
           modifiedCount++;
           if (modifiedCount === modifiedLines.length) {
              resolve();
           }
       }, Math.random() * 2000 + 1000);
    });
}).then(function() {
    fs.writeFile(inPath, modifiedLines.join(''n'));
}).catch(function(reason) {
    console.error(reason);
});

如果您希望流式传输有效 json 块而不是行,这将是一种更强大的方法,请查看 JSONStream。

如注释中所述,您拥有的文件不是正确的 JSON,尽管在 Javascript 中有效。为了生成正确的 JSON,可以使用JSON.stringify()。我认为这也会使其他人难以解析非标准 JSON,因此我建议提供一个新的输出文件而不是保留原始文件。

但是,仍然可以将原始文件解析为 JSON。这可以通过eval('(' + procline + ')');,但是像这样将外部数据带入节点并不安全.js这样。

const fs = require('fs');
const readline = require('readline');
const fr = fs.createReadStream('file1');
const rl = readline.createInterface({
    input: fr
});

rl.on('line', function (line) {
    if (line.match(new RegExp("'{name"))) {
        var procline = "";
        if (line.trim().split('').pop() === ','){
            procline = line.trim().substring(0,line.trim().length-1);
        }
        else{
            procline = line.trim();
        }
        var lineObj = eval('(' + procline + ')');
        lineObj.response = 200;
        console.log(JSON.stringify(lineObj));
    }
});

输出将如下所示:

{"name":"item1","response":200}
{"name":"item2","response":200}
{"name":"item3","response":200}

它是行分隔的JSON(LDJSON),可用于流式传输内容,而无需前导和尾随[],。它还有一个ldjson-stream包。