构造NodeJS异步代码以提高内存效率

Structuring NodeJS asynchronous code to make it more memory efficient

本文关键字:内存 效率 高内存 NodeJS 异步 代码 构造      更新时间:2023-09-26

我正在尝试建立一个简单的文档喂食器来基准ElasticSearch,我选择了NodeJS,因为我认为它最容易使用简单的JSON结构。不幸的是,我似乎是在搬起石头砸自己的脚。

以下是相关的部分:

  var logResults = function (results) {
    docsIndexed++;
    var now = +new Date();
    if (docsIndexed % 10000 === 0) {
      log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
    }
  }
  var submitDocument = function (source, index, type) {
    var doc = ejs.Document(index, type).source(source);
    doc.doIndex(logResults);
  }
  var schemas = {};
    _(10).times(function (idx) {
    schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
  });
  var docCount = 0, docsIndexed = 0, start = +new Date();
  Object.keys(schemas).forEach(function (schemaName, idx) {
    var register = function () {
      submitDocument(generateRandomDocument(schemas[schemaName]), 
        'documents', schemaName);
      docCount++;
    };
    _((idx + 1) * 1000).times(register);
    log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema " 
      + schemaName + ". Total: " + docCount);
  });

对于多达10万条记录的数据集来说,这工作得很好,但是如果我要处理数百万条记录,它就会因为内存不足而对我造成影响。

来自elastic.jsdoIndex函数是异步的,我怀疑许多对象在实际执行之前正在排队。当这个数字变得显著时,这个过程就终止了。我不明白为什么在循环结束之前没有执行回调。我想要的是一种方法,要么使它同步,要么为它提供某种池,这样它就不会在发送其他对象之前将更多对象排队。

有人可以建议一个库,可以帮助这个或更好的方式来构建代码?谢谢。


<标题> 更新

我已经尝试了Peter使用异步的建议。我想出了这个:

  /** Submit QUANT * (idx + 1) documents for each schema into the index */ 
  var QUANT = 100
    , docCount = 0
    , docsIndexed = 0
    , queue = async.queue(submitDocument, 1000)
    , paused = false
    , start = +new Date();
  queue.saturated = function () {
    log("queue is full");
    paused = true;
  };
  queue.empty = function () {
    log("All items were given to workers");
    paused = false; 
  };
  Object.keys(schemas).forEach(function (schemaName, idx) {
    var count = 0;
    while (count < (idx + 1) * QUANT) {
      if (!paused) {
        queue.push({ 
          source: generateRandomDocument(schemas[schemaName]), 
          index: 'documents', 
          type: schemaName 
        });
        count++; docCount++;
      }
    };
    log("Registered " + count + " documents for indexing for schema " 
      + schemaName + ". Total: " + docCount);
  });

如果它在循环中暂停,它将永远挂起(即queue.saturated被调用,pause被设置为true,然后程序被困在while循环中)。queue.empty callback永远不会被调用。如果队列的并发限制超过了我想要处理的数量,那么这种方法就可以很好地工作——所有消息都按预期记录。我应该在这里换什么?

<标题>更新# 2 h1> 已经改变了代码,使用异步循环,现在它的工作。我得到了一个RangeError: Maximum call stack size exceeded错误,我挣扎了一会儿。
  Object.keys(schemas).forEach(function (schemaName, idx) {
    var count = 0, executions = 0;
    async.whilst(
      function () { 
        var test = count < (idx + 1) * QUANT; 
        if (!test) log("Registered " + count + " documents for indexing for schema " 
          + schemaName + ". Executions: " + executions + ". Total: " + docCount);
        return test;
      },
      function (callback) {
        executions++;
        if (!paused) {
          queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
          count++; docCount++;
        }
        setTimeout(callback, 0);
        // also tried with "return process.nextTick(callback)"
        // and return callback();
        // this blows up nicely with an out of memory error
      },
      function (err) {}
    );
  });

我开始感到沮丧,因为我不认为这个用例真的那么复杂,我希望我对语言的工作原理有一个公平的理解。

最好是async。有很大限制的队列。只是要确保你不要在队列已经饱和后继续添加项目。使用队列饱和作为回压,等待一些工作完成,然后开始将更多任务排队。队列有钩子来支持这些关键事件。