使用mongoose保存一个非常大的CSV到mongoDB

Save a very big CSV to mongoDB using mongoose

本文关键字:非常 CSV mongoDB 一个 保存 mongoose 使用      更新时间:2023-09-26

我有一个包含超过20万行的CSV文件。我需要把它保存到MongoDB。

如果我尝试for循环,Node将耗尽内存。

fs.readFile('data.txt', function(err, data) {
  if (err) throw err;
  data.split(''n');
  for (var i = 0; i < data.length, i += 1) {
    var row = data[i].split(',');
    var obj = { /* The object to save */ }
    var entry = new Entry(obj);
    entry.save(function(err) {
      if (err) throw err;
    }
  } 
}

如何避免内存耗尽?

欢迎收看流媒体。你真正想要的是一个"事件流"来处理你的输入"一次一个块",当然最好是用一个通用的分隔符,比如你当前使用的"换行符"。

对于真正高效的东西,你可以添加使用MongoDB的"批量API"插入,使你的加载尽可能快,而不会消耗所有的机器内存或CPU周期。

不提倡,因为有各种可用的解决方案,但这里有一个清单,利用行输入流包使"行结束符"部分简单。

仅通过"example"定义模式:

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("'n");
mongoose.connection.on("open",function(err,conn) { 
    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;
    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });
    stream.on("line",function(line) {
        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation
                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.
                    counter++;
                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );
    });
    stream.on("end",function() {
        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });
});

所以一般来说,这里的"流"接口"分解输入",以便"一次一行"地处理。这样可以避免一次加载所有内容。

主要部分是MongoDB的"批量操作API"。这允许您在实际发送到服务器之前一次"排队"许多操作。所以在这种情况下,使用"模",每处理1000个条目才发送写操作。您实际上可以在16MB BSON限制下执行任何操作,但要保持可管理性。

除了批量处理的操作之外,异步库中还有一个额外的"限制器"。这实际上并不是必需的,但这确保了在任何时候处理的文档基本上不超过"模限制"。一般的批处理"插入"除了内存之外不需要IO开销,但是"执行"调用意味着IO正在处理。所以我们等待而不是排队。

肯定有更好的解决方案,你可以找到"流处理"CSV类型的数据,这似乎是。但总的来说,这给了你一个概念,如何在不消耗CPU周期的情况下以一种有效的方式做到这一点。

公认的答案很好,并试图涵盖这个问题的所有重要方面。

  1. 读取CSV文件作为行流
  2. 批量向MongoDB写入文档
  3. 读写同步

虽然它在前两个方面做得很好,但使用async.series()解决同步问题的方法不会像预期的那样工作。

stream.on("line",function(line) {
    async.series(
        [
            function(callback) {
                var row = line.split(",");     // split the lines on delimiter
                var obj = {};             
                // other manipulation
                bulk.insert(obj);  // Bulk is okay if you don't need schema
                                   // defaults. Or can just set them.
                counter++;
                if ( counter % 1000 == 0 ) {
                    bulk.execute(function(err,result) {
                        if (err) throw err;   // or do something
                        // possibly do something with result
                        bulk = Entry.collection.initializeOrderedBulkOp();
                        callback();
                    });
                } else {
                    callback();
                }
           }
       ],
       function (err) {
           // each iteration is done
       }
   );
});

这里bulk.execute()是一个mongodb写操作,它是一个异步IO调用。这允许node.js在bulk.execute()完成它的db写和回调之前继续进行事件循环。

所以它可能会继续从流中接收更多的'line'事件并排队更多的文档bulk.insert(obj),并且可以点击下一个模来再次触发bulk.execute()。

让我们看一下这个例子。

var async = require('async');
var bulk = {
    execute: function(callback) {
        setTimeout(callback, 1000);
    }
};
async.series(
    [
       function (callback) {
           bulk.execute(function() {
              console.log('completed bulk.execute');
              callback(); 
           });
       },
    ], 
    function(err) {
    }
);
console.log("!!! proceeding to read more from stream");

它的输出

!!! proceeding to read more from stream
completed bulk.execute

要真正确保我们在任何给定时间处理一批N个文档,我们需要使用stream.pause() &stream.resume()

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false });
var Entry = mongoose.model( "Entry", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("'n");
mongoose.connection.on("open",function(err,conn) { 
    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;
    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });
    stream.on("line",function(line) {
        var row = line.split(",");     // split the lines on delimiter
        var obj = {};             
        // other manipulation
        bulk.insert(obj);  // Bulk is okay if you don't need schema
                           // defaults. Or can just set them.
        counter++;
        if ( counter % 1000 === 0 ) {
            stream.pause(); //lets stop reading from file until we finish writing this batch to db
            bulk.execute(function(err,result) {
                if (err) throw err;   // or do something
                // possibly do something with result
                bulk = Entry.collection.initializeOrderedBulkOp();
                stream.resume(); //continue to read from file
            });
        }
    });
    stream.on("end",function() {
        if ( counter % 1000 != 0 ) {
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
        }
    });
});