MongoDB 中使用 mongoose 的批量更新插入
Bulk upsert in MongoDB using mongoose
有什么选项可以对猫鼬进行批量更新插入吗?所以基本上有一个数组并在每个元素不存在时插入它,或者如果存在则更新它?(我正在使用海关_ids(
当我使用 .insert MongoDB 时,重复键返回错误 E11000(应更新(。不过,插入多个新文档效果很好:
var Users = self.db.collection('Users');
Users.insert(data, function(err){
if (err) {
callback(err);
}
else {
callback(null);
}
});
使用 .save 会返回一个错误,指出参数必须是单个文档:
Users.save(data, function(err){
...
}
这个答案表明没有这样的选项,但它是特定于 C# 的,并且已经有 3 年的历史了。所以我想知道是否有任何选择可以使用猫鼬来做到这一点?
谢谢!
不是专门在"猫鼬"中,或者至少在撰写本文时还没有。从2.6版本开始,MongoDB shell实际上使用"引擎盖下"的"批量操作API",就像所有通用辅助程序方法一样。在它的实现中,它首先尝试执行此操作,如果检测到旧版本的服务器,则会对旧版实现进行"回退"。
所有猫鼬方法"当前"都使用"遗留"实现或写入关注响应和基本遗留方法。但是,任何给定的猫鼬模型都有一个.collection
访问器,它本质上是从实现猫鼬本身的底层"节点本机驱动程序"访问"集合对象":
var mongoose = require('mongoose'),
Schema = mongoose.Schema;
mongoose.connect('mongodb://localhost/test');
var sampleSchema = new Schema({},{ "strict": false });
var Sample = mongoose.model( "Sample", sampleSchema, "sample" );
mongoose.connection.on("open", function(err,conn) {
var bulk = Sample.collection.initializeOrderedBulkOp();
var counter = 0;
// representing a long loop
for ( var x = 0; x < 100000; x++ ) {
bulk.find(/* some search */).upsert().updateOne(
/* update conditions */
});
counter++;
if ( counter % 1000 == 0 )
bulk.execute(function(err,result) {
bulk = Sample.collection.initializeOrderedBulkOp();
});
}
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
// maybe do something with result
});
});
主要问题是"猫鼬方法"实际上知道实际上可能还没有建立连接,并"排队"直到完成。您正在"挖掘"的本机驱动程序不会进行此区分。
因此,您确实必须意识到连接是以某种方式或形式建立的。但是,只要您小心处理正在做的事情,就可以使用本机驱动程序方法。
您不需要按照 @neil-lunn 的建议管理限制 (1000(。猫鼬已经这样做了。我用他的精彩回答作为这个完整的基于Promise的实现和示例的基础:
var Promise = require('bluebird');
var mongoose = require('mongoose');
var Show = mongoose.model('Show', {
"id": Number,
"title": String,
"provider": {'type':String, 'default':'eztv'}
});
/**
* Atomic connect Promise - not sure if I need this, might be in mongoose already..
* @return {Priomise}
*/
function connect(uri, options){
return new Promise(function(resolve, reject){
mongoose.connect(uri, options, function(err){
if (err) return reject(err);
resolve(mongoose.connection);
});
});
}
/**
* Bulk-upsert an array of records
* @param {Array} records List of records to update
* @param {Model} Model Mongoose model to update
* @param {Object} match Database field to match
* @return {Promise} always resolves a BulkWriteResult
*/
function save(records, Model, match){
match = match || 'id';
return new Promise(function(resolve, reject){
var bulk = Model.collection.initializeUnorderedBulkOp();
records.forEach(function(record){
var query = {};
query[match] = record[match];
bulk.find(query).upsert().updateOne( record );
});
bulk.execute(function(err, bulkres){
if (err) return reject(err);
resolve(bulkres);
});
});
}
/**
* Map function for EZTV-to-Show
* @param {Object} show EZTV show
* @return {Object} Mongoose Show object
*/
function mapEZ(show){
return {
title: show.title,
id: Number(show.id),
provider: 'eztv'
};
}
// if you are not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}
// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
// if(err) return console.log('EZ Error:', err);
// var shows = shows.map(mapEZ);
console.log('found', shows.length, 'shows.');
connect('mongodb://localhost/tv', {}).then(function(db){
save(shows, Show).then(function(bulkRes){
console.log('Bulk complete.', bulkRes);
db.close();
}, function(err){
console.log('Bulk Error:', err);
db.close();
});
}, function(err){
console.log('DB Error:', err);
});
// });
这样做的好处是在连接完成后关闭连接,如果您关心,则显示任何错误,但如果不关心,则忽略它们(承诺中的错误回调是可选的。它也非常快。只是把这个留在这里分享我的发现。例如,如果您想将所有 eztv 节目保存到数据库中,您可以取消注释 eztv 内容。
await Model.bulkWrite(docs.map(doc => ({
updateOne: {
filter: {id: doc.id},
update: doc,
upsert: true
}
})))
或者更详细:
const bulkOps = docs.map(doc => ({
updateOne: {
filter: {id: doc.id},
update: doc,
upsert: true
}
}))
Model.bulkWrite(bulkOps)
.then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
.catch(err => console.error('BULK update error:', err))
https://stackoverflow.com/a/60330161/5318303
我已经发布了一个 Mongoose 插件,它公开了一个静态upsertMany
方法,以使用 promise 接口执行批量更新插入操作。
与在底层集合上初始化您自己的批量操作相比,使用此插件的另一个好处是,此插件首先将您的数据转换为 Mongoose 模型,然后在更新之前返回到普通对象。这可确保应用 Mongoose 架构验证,并且数据将减少填充并适合原始插入。
https://github.com/meanie/mongoose-upsert-manyhttps://www.npmjs.com/package/@meanie/mongoose-upsert-many
希望对您有所帮助!
如果您在 db.collection 中没有看到批量方法,即您会收到一个错误,大意是xxx 变量没有方法:initializeOrderedBulkOp((
尝试更新您的猫鼬版本。 显然,较旧的猫鼬版本不会通过所有底层的mongo db.collection方法。
npm 安装猫鼬
为我照顾好了。
我最近必须在将产品存储在我的电子商务应用程序中时实现这一目标。我的数据库曾经超时,因为我必须每 4 小时更新 10000 个项目。我的一个选择是在连接到数据库时在猫鼬中设置 socketTimeoutMS 和 connectTimeoutMS,但它感觉有点笨拙,我不想操纵数据库的连接超时默认值。我还看到 @neil lunn 的解决方案采用了一种简单的同步方法,即在 for 循环中获取模数。这是我的异步版本,我相信它可以更好地完成工作
let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
var sets = [];
var chunks = this.length / groupsize;
for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
sets[i] = this.slice(j, j + groupsize);
}
return sets;
}
function upsertDiscountedProducts(products) {
//Take the input array of products and divide it into chunks of BATCH_SIZE
let chunks = products.chunk(BATCH_SIZE), current = 0
console.log('Number of chunks ', chunks.length)
let bulk = models.Product.collection.initializeUnorderedBulkOp();
//Get the current time as timestamp
let timestamp = new Date(),
//Keep track of the number of items being looped
pendingCount = 0,
inserted = 0,
upserted = 0,
matched = 0,
modified = 0,
removed = 0,
//If atleast one upsert was performed
upsertHappened = false;
//Call the load function to get started
load()
function load() {
//If we have a chunk to process
if (current < chunks.length) {
console.log('Current value ', current)
for (let i = 0; i < chunks[current].length; i++) {
//For each item set the updated timestamp to the current time
let item = chunks[current][i]
//Set the updated timestamp on each item
item.updatedAt = timestamp;
bulk.find({ _id: item._id })
.upsert()
.updateOne({
"$set": item,
//If the item is being newly inserted, set a created timestamp on it
"$setOnInsert": {
"createdAt": timestamp
}
})
}
//Execute the bulk operation for the current chunk
bulk.execute((error, result) => {
if (error) {
console.error('Error while inserting products' + JSON.stringify(error))
next()
}
else {
//Atleast one upsert has happened
upsertHappened = true;
inserted += result.nInserted
upserted += result.nUpserted
matched += result.nMatched
modified += result.nModified
removed += result.nRemoved
//Move to the next chunk
next()
}
})
}
else {
console.log("Calling finish")
finish()
}
}
function next() {
current++;
//Reassign bulk to a new object and call load once again on the new object after incrementing chunk
bulk = models.Product.collection.initializeUnorderedBulkOp();
setTimeout(load, 0)
}
function finish() {
console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)
//If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
if (upsertHappened) {
console.log("Calling remove")
remove()
}
}
/**
* Remove all the items that were not updated in the recent upsert or those items with a discount of 0
*/
function remove() {
models.Product.remove(
{
"$or":
[{
"updatedAt": { "$lt": timestamp }
},
{
"discount": { "$eq": 0 }
}]
}, (error, obj) => {
if (error) {
console.log('Error while removing', JSON.stringify(error))
}
else {
if (obj.result.n === 0) {
console.log('Nothing was removed')
} else {
console.log('Removed ' + obj.result.n + ' documents')
}
}
}
)
}
}
你可以使用 mongoose 的 Model.bulkWrite((
const res = await Character.bulkWrite([
{
updateOne: {
filter: { name: 'Will Riker' },
update: { age: 29 },
upsert: true
}
},
{
updateOne: {
filter: { name: 'Geordi La Forge' },
update: { age: 29 },
upsert: true
}
}
]);
参考 : https://masteringjs.io/tutorials/mongoose/upsert
- 如何使用Extendscript scriptui插入、更新、删除XML元素的值
- 如何在手动插入一些文本后更新文本区域的ng模型
- Mongodb查看数组中是否所有项都存在,并更新else插入
- 在浏览器中更新页面后,动态插入的页面内容将消失
- 是否可以在文本区域中插入文本并更新撤消/重做队列
- 如何使用angularJs/Javascript/Jquery获取表的最后一条记录并更新列值并插入到具有新记录的同一列
- 如何在 angularjs 中插入和更新查询
- 在插入数据库后更新 Meteor js 中的客户端视图
- 环回 - 基于 id 以外的其他属性的 REST 更新插入/更新
- 使用 PHP 创建、插入、更新或删除 XML
- 如何更新用户插入的值(使用 javascript 和 php)
- 如何将复选框值正确插入和更新到数据库中
- 使用 Ajax 和 PHP 插入和更新页面而无需重新加载
- 如果找不到文档,则不插入,而是更新插入
- 如果字段不在新文档中,则更新插入会删除该字段
- KnockoutJS:使用映射将数据更新/插入到viewModel
- 用于快速读取、不频繁更新/插入的文件或数据库
- MongoDB 中使用 mongoose 的批量更新插入
- 使用PHP Laravel 4框架的angularjs中的Timeout函数来更新插入的数据
- 通过ajax代理进行CRUD批量更新/插入