如何最有效地更新MongoDB中的大量文档

How to update a large number of documents in MongoDB most effeciently?

本文关键字:文档 MongoDB 何最 有效地 更新      更新时间:2023-09-26

我想最有效地更新大量(>10000)文档。

我的第一个天真的方法是在JS级别上进行,编写脚本首先获取_id,然后循环遍历_id并通过_id调用更新(fulldocs或$set补丁)。

我遇到了内存问题,还将数据分割成最多500个块文档(打开和关闭连接)似乎工作不好。

那么我该如何在MongoDB级别上解决这个问题呢
最佳实践?

我有3个常见的用例,通常是维护工作流:

1.更改属性值的类型,而不更改值

// before
{
  timestamp : '1446987395'
}
// after
{
  timestamp : 1446987395
}

2.根据现有属性的值添加新属性

// before
{
  firstname : 'John',
  lastname  : 'Doe'
}
// after
{
  firstname : 'John',
  lastname  : 'Doe',
  name      : 'John Doe'
}

3.只需添加或删除文档中的属性

// before
{
  street    : 'Whatever Ave',
  street_no : '1025'
}
// after
{
  street    : 'Whatever Ave',
  no        : '1025'
}

谢谢你帮忙。

如果您的MongoDB服务器是2.6或更高版本,最好使用允许执行批量update操作的写命令Bulk API,这些操作只是服务器上的抽象,以便轻松构建批量操作。这些批量操作主要有两种类型:

  • 订单批量操作。这些操作按顺序执行所有操作,并在第一次写入错误时出错
  • 无序批量操作。这些操作并行执行所有操作,并汇总所有错误。无序的批量操作不能保证执行顺序

注意,对于2.6以上的旧服务器,API将对操作进行下变频。然而,不可能100%下变频,因此可能存在一些边缘情况,即它无法正确报告正确的数字。

对于您的三个常见用例,您可以实现这样的Bulk API:

情况1.更改属性值的类型,而不更改值:

var MongoClient = require('mongodb').MongoClient;
MongoClient.connect("mongodb://localhost:27017/test", function(err, db) {
    // Handle error
    if(err) throw err;
    // Get the collection and bulk api artefacts
    var col = db.collection('users'),           
        bulk = col.initializeOrderedBulkOp(), // Initialize the Ordered Batch
        counter = 0;        
    // Case 1. Change type of value of property, without changing the value.        
    col.find({"timestamp": {"$exists": true, "$type": 2} }).each(function (err, doc) {
        var newTimestamp = parseInt(doc.timestamp);
        bulk.find({ "_id": doc._id }).updateOne({
            "$set": { "timestamp": newTimestamp }
        });
        counter++;
        if (counter % 1000 == 0 ) {
            bulk.execute(function(err, result) {  
                // re-initialise batch operation           
                bulk = col.initializeOrderedBulkOp();
            });
        }
    });
    if (counter % 1000 != 0 ){
        bulk.execute(function(err, result) {
            // do something with result
            db.close();
        }); 
    } 
});

案例2.根据现有属性的值添加新属性:

MongoClient.connect("mongodb://localhost:27017/test", function(err, db) {
    // Handle error
    if(err) throw err;
    // Get the collection and bulk api artefacts
    var col = db.collection('users'),           
        bulk = col.initializeOrderedBulkOp(), // Initialize the Ordered Batch
        counter = 0;        
    // Case 2. Add new property based on value of existing property.        
    col.find({"name": {"$exists": false } }).each(function (err, doc) {
        var fullName = doc.firstname + " " doc.lastname;
        bulk.find({ "_id": doc._id }).updateOne({
            "$set": { "name": fullName }
        });
        counter++;
        if (counter % 1000 == 0 ) {
            bulk.execute(function(err, result) {  
                // re-initialise batch operation           
                bulk = col.initializeOrderedBulkOp();
            });
        }
    });
    if (counter % 1000 != 0 ){
        bulk.execute(function(err, result) {
            // do something with result
            db.close();
        }); 
    } 
});

案例3.只需添加或删除文档中的属性。

MongoClient.connect("mongodb://localhost:27017/test", function(err, db) {
    // Handle error
    if(err) throw err;
    // Get the collection and bulk api artefacts
    var col = db.collection('users'),           
        bulk = col.initializeOrderedBulkOp(), // Initialize the Ordered Batch
        counter = 0;        
    // Case 3. Simply adding removing properties from documents.    
    col.find({"street_no": {"$exists": true } }).each(function (err, doc) {
        bulk.find({ "_id": doc._id }).updateOne({
            "$set": { "no": doc.street_no },
            "$unset": { "street_no": "" }
        });
        counter++;
        if (counter % 1000 == 0 ) {
            bulk.execute(function(err, result) {  
                // re-initialise batch operation           
                bulk = col.initializeOrderedBulkOp();
            });
        }
    });
    if (counter % 1000 != 0 ){
        bulk.execute(function(err, result) {
            // do something with result
            db.close();
        }); 
    } 
});