使用mongoose保存一个非常大的CSV到mongoDB
Save a very big CSV to mongoDB using mongoose
我有一个包含超过20万行的CSV文件。我需要把它保存到MongoDB。
如果我尝试for循环,Node将耗尽内存。
fs.readFile('data.txt', function(err, data) {
if (err) throw err;
data.split(''n');
for (var i = 0; i < data.length, i += 1) {
var row = data[i].split(',');
var obj = { /* The object to save */ }
var entry = new Entry(obj);
entry.save(function(err) {
if (err) throw err;
}
}
}
如何避免内存耗尽?
欢迎收看流媒体。你真正想要的是一个"事件流"来处理你的输入"一次一个块",当然最好是用一个通用的分隔符,比如你当前使用的"换行符"。
对于真正高效的东西,你可以添加使用MongoDB的"批量API"插入,使你的加载尽可能快,而不会消耗所有的机器内存或CPU周期。
不提倡,因为有各种可用的解决方案,但这里有一个清单,利用行输入流包使"行结束符"部分简单。
仅通过"example"定义模式:
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("'n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
所以一般来说,这里的"流"接口"分解输入",以便"一次一行"地处理。这样可以避免一次加载所有内容。
主要部分是MongoDB的"批量操作API"。这允许您在实际发送到服务器之前一次"排队"许多操作。所以在这种情况下,使用"模",每处理1000个条目才发送写操作。您实际上可以在16MB BSON限制下执行任何操作,但要保持可管理性。
除了批量处理的操作之外,异步库中还有一个额外的"限制器"。这实际上并不是必需的,但这确保了在任何时候处理的文档基本上不超过"模限制"。一般的批处理"插入"除了内存之外不需要IO开销,但是"执行"调用意味着IO正在处理。所以我们等待而不是排队。
肯定有更好的解决方案,你可以找到"流处理"CSV类型的数据,这似乎是。但总的来说,这给了你一个概念,如何在不消耗CPU周期的情况下以一种有效的方式做到这一点。
公认的答案很好,并试图涵盖这个问题的所有重要方面。
- 读取CSV文件作为行流
- 批量向MongoDB写入文档
- 读写同步
虽然它在前两个方面做得很好,但使用async.series()解决同步问题的方法不会像预期的那样工作。
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
bulk.execute(function(err,result) {
if (err) throw err; // or do something
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
这里bulk.execute()是一个mongodb写操作,它是一个异步IO调用。这允许node.js在bulk.execute()完成它的db写和回调之前继续进行事件循环。
所以它可能会继续从流中接收更多的'line'事件并排队更多的文档bulk.insert(obj)
,并且可以点击下一个模来再次触发bulk.execute()。
让我们看一下这个例子。
var async = require('async');
var bulk = {
execute: function(callback) {
setTimeout(callback, 1000);
}
};
async.series(
[
function (callback) {
bulk.execute(function() {
console.log('completed bulk.execute');
callback();
});
},
],
function(err) {
}
);
console.log("!!! proceeding to read more from stream");
它的输出
!!! proceeding to read more from stream
completed bulk.execute
要真正确保我们在任何给定时间处理一批N个文档,我们需要使用stream.pause()
&stream.resume()
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false });
var Entry = mongoose.model( "Entry", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("'n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 === 0 ) {
stream.pause(); //lets stop reading from file until we finish writing this batch to db
bulk.execute(function(err,result) {
if (err) throw err; // or do something
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume(); //continue to read from file
});
}
});
stream.on("end",function() {
if ( counter % 1000 != 0 ) {
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
}
});
});
- 在POST中将html表作为csv提交
- 通过CSV文件上载更新数据库表
- 在.csv文件中写入学位符号
- PERL-下载CSV文件不完整
- 将CSV文件从URL导入Node.js
- 通过javascript操作图像,非常简单
- Get方法获取csv文件的内容
- 如何将csv数据导入netsuite
- 将JSON转换为放入Sdcard中的CSV文件
- 将基本CSV数据加载到D3
- 非常简单的XMLHttpRequest不起作用
- 在csv文件名中使用西班牙语字符
- 如何使用d3.js从csv文件中绘制areaplot?请任何人分享代码
- 正在以javascript读取本地csv文件
- d3.csv有很多数据,只画了一个月
- 处理字段中带有换行符的csv文件-node.js
- 编辑CSV数组中的项目-快速CSV node.js
- 使用mongoose保存一个非常大的CSV到mongoDB
- 如何读取文件.csv非常大的FileReader API javascript, html/jquery只有
- D3.JS无法加载非常大的.csv文件