停止并行运行的异步任务

stoping async tasks running in parallel

本文关键字:异步 任务 运行 并行      更新时间:2023-09-26

我试图(使用foreach(运行一个对象数组,然后为每个对象调用一个函数,该函数使用request获取文件,然后使用zlib解压缩文件,但考虑到节点的性质,一次一个,这目前是异步完成的。

我想做这样的事情
-foreach-第一个对象
-第一个对象的调用函数
-功能完成时
-转到阵列中的下一个对象

我尝试过使用SYNC模块来解决这个问题,但没有成功。

关于我该如何做到这一点,有什么想法吗?

// the function i am trying to run for each in sync    
var downloadUnzipFile = function(mID) {
      try {
        // Read File
        console.log("Started download/unzip of merchant: " + mID + " @ " + new Date().format('H:i:s').toString());
        request(linkConst(mID))
          // Un-Gzip
          .pipe(zlib.createGunzip())
          // Write File
          .pipe(fs.createWriteStream(fileName(mID)))
          .on('error', function(err) {
            console.error(err);
          })
          .on('finish', function() {
            console.log("CSV created: " + fileName(mID));
            console.log("Completed merchant: " + mID + " @ " + new Date().format('H:i:s').toString());
            //console.log("Parsing CSV...");
            //csvReader(fileName);
          });
      } catch (e) {
        console.error(e);
      }
    }
    module.exports = function(sMerchants) {
      var oMerchants = JSON.parse(JSON.stringify(sMerchants));
      sync(function() {
        oMerchants.forEach(function eachMerchant(merchant) {
          downloadUnzipFile(merchant.merchant_aw_id);
        })
      })
    };
var promiseQueue = (function() {
    'use strict';
    var promiseQueue = function() {
        var queue = [Promise.resolve(true)];
        var add = function(cb) {
            var args = Array.prototype.slice.call(arguments);
            args.shift();
            queue.unshift(new Promise(function(resolve) {
                queue[0].then(function() {
                    resolve(cb.apply(null, args));
                    queue.pop();
                });
            }));
        };
        return {
            add: add
        }
    }
    return promiseQueue;
}());

用法示例:这是将被称为的异步函数

var theFun = function (time, n) { // use whatever arguments you like that will be called with your function
    return new Promise(function(resolve) {
        //asynch function goes here INSTEAD of the setTimeout and it's contents, I repeat, INSTEAD of the setTimeout
        setTimeout(function() { // this is for demonstrating ONLY
            console.log('resolving', n, time); // this is for demonstrating ONLY
            resolve(time); // this is for demonstrating ONLY
        }, time); // this is for demonstrating ONLY
        // remember to resolve("someValueNotImportantAsItIsntUsedAnywhere") on completion of your asynch function
    });
}

这就是项目被添加到队列的方式——我这样做是因为我的用例

var pq = promiseQueue();
for(var i = 0; i < 5; i++ ) {
    var r = 1000 - i * 150;
    console.log('adding ', i, r);
    pq.add(theFun, r, i);
}

希望你能找到一些使用的方法

首先,函数需要进行回调,以便在完成后进行通信:

var downloadUnzipFile = function(mID, next) {
  try {
    // Read File
    console.log("Started download/unzip of merchant: " + mID + " @ " + new Date().format('H:i:s').toString());
    request(linkConst(mID))
      // Un-Gzip
      .pipe(zlib.createGunzip())
      // Write File
      .pipe(fs.createWriteStream(fileName(mID)))
      .on('error', function(err) {
        console.error(err);
      })
      .on('finish', function() {
        console.log("CSV created: " + fileName(mID));
        console.log("Completed merchant: " + mID + " @ " + new Date().format('H:i:s').toString());
        //console.log("Parsing CSV...");
        //csvReader(fileName);
        next();
      });
  } catch (e) {
    console.error(e);
    next();
  }
}

然后,当上一个完成时,我们需要递归地调用每一个:

module.exports = function(sMerchants, next) {
  var oMerchants = JSON.parse(JSON.stringify(sMerchants));
  var i = 0;
  var run = function() {
    if(i < oMerchants.length)
      downloadUnzipFile(i++, run);
    else
      next();
  };
};

请注意,我还为导出的函数添加了一个回调,这样它就可以在完成时进行通信。如果没有必要,可以将其删除。

这可能适用于您,使用Promise。需要将resolvereject回调添加到您的downloadUnzipFile-

var exports = (function () {
    'use strict';
    
    var pre = document.getElementById('out');
    
    function log(str) {
        pre.appendChild(document.createTextNode(str + ''n'));
    }
    
    function downloadUnzipFile(id, resolve, reject) {
        log('Start: ' + id);
        
        try {
            setTimeout(function () {
                resolve(id);
            }, 3000);
        } catch (e) {
            reject(e);
        }
    }
    function done(id) {
        log('Done: ' + id);
    }
    function error(e) {
        log(e.message);
    }
    function getPromise(mID) {
        return new Promise(function (resolve, reject) {
            downloadUnzipFile(mID, resolve, reject);
        });
    }
    return function (sMerchants) {
        JSON.parse(sMerchants).reduce(function (next, mID) {
            if (!next) {
                next = getPromise(mID);
            } else {
                next = next.then(function (id) {
                    done(id);
                    return getPromise(mID);
                }, error);
            }
            return next;
        }, null).then(done, error);
    };
}());
exports(JSON.stringify([1, 2, 3, 4, 5]));
<script src="https://cdnjs.cloudflare.com/ajax/libs/json2/20150503/json2.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/es5-shim/4.1.7/es5-shim.min.js"></script>
<script src="https://rawgit.com/jakearchibald/es6-promise/master/dist/es6-promise.min.js"></script>
<pre id="out"></pre>

我添加了浏览器垫片来支持可能正在查看的旧浏览器。因此,在node.js上不应该需要它们,但如果使用旧的node.js,则可能需要requirePromise垫片。