使用node.js将非常大的json文件索引/摄取到数据库中

index/ingest very large json file to database with node.js

本文关键字:索引 数据库 文件 json js node 非常 使用      更新时间:2024-03-18

我继承了一个巨大的json文件,我正试图将其索引到elasticsearch中(不是真正的数据库,但不要挂起它应该适用于大多数数据库摄取)。我正在使用node进行摄取。我尝试过流和异步,但我很困惑——我没有解决这个问题的框架——没有内存溢出之类的。

我不能发布1比1,但它实际上是一个多维对象,看起来像:

[ 
   { 
     document: {
        type: 1,
        type2: 2,
        type3: {...}
    },
    {...}
]

我只需要吸收文档,我可以使用弹性搜索客户端并批量处理它们。我需要放慢流、解析和块的速度。

完全卡住了。。。帮斯塔克佛流吧,今天是星期五,我想回家。

基于migg对json解析流的建议——我尝试过的第三个json流库——我终于有了一个工作的摄取。事实上,当我写这篇文章的时候,它正在运行。希望有人会发现这个有用。

const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;

var rs = fs.createReadStream('./resources/mydoc.json');
var bulkExec = function (body, callback) {
  console.log(body);
  client.bulk({
    index: 'my_index',
    type: 'my_type',
    body: body
  }, callback);
};
var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });

const done = (err, res) =>  {
  if (err) {
    console.log(err);
  }
  console.log(res);
  console.log('go get a drink you deserve it');
};
var ws = new WritableBulk(bulkExec);
rs.pipe(parse())
.pipe(es.mapSync(function (element) {
  var a =  [];
  if (element.key === 'document') {
    a = element.value;
    return a;
  }
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);