Node.js并行执行

node.js parallel execution

本文关键字:并行执行 js Node      更新时间:2023-09-26

我正在尝试学习node.js并行执行。我编写了下面的示例代码。但是,输出是串行的。第一个0 . .99被打印出来,然后100..200.

我理解这是因为node.js本质上是单线程的,在循环中,线程被for循环捕获。

我想理解的是在什么情况下这个flow.parallel结构是有用的?在node.js中,任何对I/O或数据库的请求都将是异步的。那么为什么我们需要flow.parallel呢?

var flow = require('nimble');

flow.parallel([
    function a(callback)
    {
        for(var i=0;i<100;++i)
        {
            console.log(i);
        }
            callback();
    },
    function b(callback)
    {
        for (var i=100;i<200;++i)
        {
            console.log(i);
        }
        callback();
    }
    ]);

在大多数情况下,使用这样的并行流,您不会在for循环中打印一堆数字(这恰好会阻塞执行)。当你注册你的函数时,它们以你在传递给parallel的数组中定义它们的相同顺序注册。在上面的例子中,首先是function a,其次是function b。因此,Node的事件循环将首先调用a(),然后在一个未公开的时间调用b()。因为我们知道这些for循环是阻塞的,并且node在单个线程中运行,所以它必须在a()中完成整个for循环,并在node的事件循环再次控制它之前最终返回,其中b()正在队列中等待类似的处理。

为什么并行流控制结构有用?按照设计,您不应该在节点内执行阻塞操作(参见示例)。a()消耗整个线程,然后b()将在其他任何事情发生之前消耗整个线程。

a()  b()
 |
 |
 |
 |
RET
     |
     |
     |
     |
    RET

现在,假设你正在制作一个web应用程序,用户可以在其中注册并同时上传图片。您的用户注册代码可能如下所示:

var newUser = {
  username: 'bob',
  password: '...', 
  email: 'bob@example.com',
  picture: '20140806-210743.jpg'
}
var file = path.join(img.IMG_STORE_DIR, newUser.picture);
flow.parallel([
  function processImage(callback) {
    img.process(function (err) {
      if (err) return callback(err); 
      img.save(file, function (err) {
        return callback(err); // err should be falsey if everything was good
      })
    });
  },
  function dbInsert(callback) {
    db.doQuery('insert', newUser, function (err, id) {
      return callback(err);
    });
  }
], function () {
  // send the results to the user now to let them know they are all registered! 
});

这里的内部函数是非阻塞的,并且都调用处理或网络负载操作。然而,它们彼此是相当独立的。你不需要一个结束另一个开始。在我们看不到代码的函数中,它们使用了更多带有函数回调的异步调用,每个回调都为Node处理另一个项目排队。Node将尝试清空队列,在CPU周期之间均匀地分配工作负载。

我们希望这样的事情正在发生:

a = processImage
b = dbInsert
a()  b()
 |
      |
 |
      |
 |   
      |
 |
RET   |
     RET

如果我们将它们串联在一起,也就是说,你必须在插入数据库之前等待图像完全被处理,你必须做很多等待。如果系统上的IO非常高,节点将会在等待操作系统时无所事事。相比之下,从理论上讲,使用并行将允许较慢的操作让位给较快的操作。

如果Node自己做这些,为什么我们真的需要它?关键字在您省略的第二个参数中。

nimble.parallel([a,b], function () {
  // both functions have now returned and called-back. 
}); 

您现在可以看到两个任务何时完成,node默认情况下不会这样做,所以它可以是一个相当有用的东西。

flow.parallel为您提供了可重用的逻辑,用于确定所有并行操作何时完成。是的,如果您只执行db.query('one');db.query('two');db.query('three');,它们将通过异步的性质并行执行,但是您必须编写一些样板代码来跟踪它们何时全部完成以及是否遇到错误。这是flow.parallel(或任何流量控制库中的对应部分)提供的部分。

Node.js中的并行执行

使用Nodejs在并行执行中读取文件目录

创建dir>

mkdir演示

创建文件

demo.txt、demo2.txt demo3.txt

每个文件有一些包含或段落

创建word_count.js文件

var fs = require('fs');
var completedTasks = 0;
var tasks = [];
var wordCounts = {};
var filesDir = './test';
function checkIfComplete() {
      completedTasks++;
    if(completedTasks == tasks.length){
          for (var index in wordCounts){
            console.log(index +': ' + wordCounts[index]);
         }
      }
 }
 function countWordsInText(text) {
     var words = text
         .toString()
         .toLowerCase()
         .split(/'W+/)
         .sort();
     for (var index in words) {
       var word = words[index];
        if(word) {
          wordCounts[word] = (wordCounts[word]) ? wordCounts[word] + 1 : 1;
       }
    }
 }
 fs.readdir(filesDir, function(err, files){
   if(err) throw err;
   for (var index in files) {
     var task =(function (file) {
       return function() {
         fs.readFile(file, function(err, text) {
           if(err) throw err;
            countsInText(text);
            checkIfComplete();
         });
       }
     })(filesDir + '/' + files[index]);
     tasks.push(task);
   }
   for (var task in tasks) {
     tasks[task] ();
   }
 });