从async.each发送到两个不同的函数

From an async.each send to two differents functions

本文关键字:两个 函数 each async      更新时间:2023-09-26

我正在尝试这样做,我将信息接收到一个对象数组,然后我使用async.each函数,调用两个将数据发送到另一个api的函数,我使用ExpressJs。这是代码:

异步功能:

 async.each(readingsArray, function(reading, callback) {
    var firstTwo;
    firstTwo = reading.value.substring(0,2);
    if(firstTwo === '32') {
      updateLocation(reading, function() {
        callback();
      });
    } else {
      updateItem(reading, function() {
        callback();
      });
    }
  }, function(err) {
    if(err) {
      res.send(Boom.badRequest('Can't sent data'));
    } else {
      res.send({
        statusCode: 200,
        message: 'Data sent'
      });
    }
  });

另外两个功能:

    var updateLocation = function updateLocation (reading, callback) {
    request.patch(config.url + '/handhelds/' + reading.id)
    .send({
      //some information
    })
    .end(function(err, res) {
      if (err) {
        callback(err);
      } else {
        callback();
      }
    });
  };

  var updateItem = function updateItem(lectura, callback) {
  request.post(config.url + '/items/' + reading.id +    '/event')
  .send({
   //some information
  })
  .end(function(err, res) {
    if (err) {
      callback(err);
    } else {
        callback();
      }
    });
};

我仍在努力理解nodejs的异步部分。我想做的是更新设备位置,然后用另一个函数updateItem发送项目,直到调用另一个updateLocation,然后继续发送项目。这就是为什么我在Async中放置了一个从位置或项进行标识的if。有可能吗?

第1版:

以下是我使用的所有代码:

'use strict';
var express = require('express');
var bodyParser = require('body-parser');
var request = require('superagent');
var async = require('async');
var moment = require('moment-timezone');
var Boom = require('boom');
var config = require('./config.js');
var app = express();
var getReadings = function getReadings(csvValue, mac) {
//Here I return the array of objects
  var readings, values;
  var arrayFromCSV = function arrayFromCSV(csv) {
    var array = values.replace(/'n/g, ',').replace(/''/g, '').replace(/'"/g, '').split(',');
    array.pop(); 
    return array;
  };
  var epochToISO = function epochToISO(epochValue) {
    var epochInMilliseconds = parseInt(epochValue, 10) / 1000;
    var date = new Date(epochInMilliseconds);
    return date.toISOString();
  };
  var convertToArgentinaTimezone = function convertToArgTimezone(utcTime) {
    return moment.tz(utcTime, 'America/Argentina/Buenos_Aires').format();
  };
  var separateValues = function separateValues(readings) {
    var items = [];
    for (var i = 0; i < lecturas.length; i += 4) {
      var newObject = {
        epc: lecturas[i + 1],
        mac: mac,
        timeStamp: convertToArgentinaTimezone(epochToISO(lecturas[i + 2]))
      };
      items.push(newObject);
    }
    return items;
  };
  lecturas = arrayFromCSV(valorCSV);
  epcs = separarPorEPC(lecturas);
  return epcs;
};
 var updateLocation = function updateLocation (reading, callback) {
    request.patch(config.url + '/handhelds/' + reading.id)
    .send({
      //some information
    })
    .end(function(err, res) {
      if (err) {
        callback(err);
      } else {
        callback();
      }
    });
  };
var updateItem = function updateItem(lectura, callback) {
  request.post(config.url + '/items/' + reading.id +    '/event')
  .send({
   //some information
  })
  .end(function(err, res) {
    if (err) {
      callback(err);
    } else {
        callback();
      }
    });
};
app.use(bodyParser.urlencoded({
  extended: true
}));
app.post('/', function(req, res) {
  var readingsInValues = req.body.field_values;
  var mac = req.body.reader_name.replace(/''/g, '').replace(/'"/g, '');
  var readingArrays = getReadings(readingsInValues, mac);
   async.each(readingsArrays, function(reading, callback) {
    var firstTwo;
    firstTwo = reading.value.substring(0,2);
    if(firstTwo === '32') {
      updateLocation(reading, function() {
        callback();
      });
    } else {
      updateItem(reading, function() {
        callback();
      });
    }
  }, function(err) {
    if(err) {
      res.send(Boom.badRequest('Can't sent data'));
    } else {
      res.send({
        statusCode: 200,
        message: 'Data sent'
      });
    }
  });
});
console.log('Traductor encendido');
app.listen(config.port);

阵列如下:

      [ { id: '32144B5A1231200000001',
        mac: '00:16:25:10:57:E9',
        timeStamp: '2015-11-20T10:28:26-03:00' },
      { id: '30142F13F44123F0000057',
        mac: '00:16:25:10:57:E9',
        timeStamp: '2015-11-20T10:28:28-03:00' },
      { id: '32144B1231230000001AB',
        mac: '00:16:25:10:57:E9',
        timeStamp: '2015-11-20T10:29:09-03:00' },
      { id: '30142F13F0234234000055',
        mac: '00:16:25:10:57:E9',
        timeStamp: '2015-11-20T10:30:19-03:00' } ]

希望这能澄清我的问题。

我假设您的readingsArray可能包含以下数据:

[
  '32_LocationA',
  '0000_someDataAboutLocationA',
  '0001_someDataAboutLocationA',
  '32_LocationB',
  '0002_someDataAboutLocationB',
  '0003_someDataAboutLocationB'
]

这是6条独立的消息,按一定顺序收集。

您应该理解的是,async.each将同时发送所有消息,而不考虑订单。if语句只是将每条消息路由到不同的URL(基于firstTwo值)。我会称之为你的discriminatorFn

如果您关心ORDER,那么应该通过对需要按特定顺序排列的消息进行分组来预处理数组。例如,您可能希望在00000001消息之前发送32_LocationA消息。与32_LocationB00020003相同。让我们把这些称为"批次"。

一个批次可能只是相关读数的子阵列,因此一个批次阵列是一个2D阵列,每个批次都按照"正确"的顺序:

[
   // batch A:
   [ '32_LocationA', '0000...', '0001...' ],
   // batch B: 
   [ '32_LocationB', '0002...', '0003...' ]
]

对于每个批次,您可能希望确保先调用updateLocation,但可能不关心哪个批次先调用(它们可以同时发送)。这是async.eachasync.eachSeries的用例,例如:

async.each(batches, function(batch, callback) {
  async.eachSeries(batch, discriminatorFn, callback); 
}, function(err) { 
  // results
});

eachSeries处理batch数组中的每个读取,并等待迭代器函数(discriminatorFn)在调用下一个读取之前调用其callback参数。

希望这能有所帮助。

编辑

// uses the `reduce` function to transform
// `p` is the accumulator, "previousValue"
// `c` is the current element of the array, "currentValue" 
var batched = readingsArray.reduce(function(p, c) {
  if(c.value.substring(0,2) == '32') { p.push([]); }
  p[p.length-1].push(c); 
  return p;
}, []);

如果request来自Express,则不适用于您想要做的事情,因此这将不起作用。它用于托管一个响应客户端请求的服务器。

这将帮助您从代码中执行http调用:https://nodejs.org/api/http.html#http_http_request_options_callback.或者你可以使用现有的包来简化你的代码(例如)

Async不保证处理读数的任何顺序,你确定这是你想要的吗?

此外,您还可以简化代码,callback已经是一个函数,因此不需要将其封装在函数中。

async.each(readingsArray, function (reading, callback) {
  var firstTwo;
  firstTwo = reading.value.substring(0, 2);
  if (firstTwo === '32') {
    updateLocation(reading, callback);      // <<<<<<<<<< here
  } else {
    updateItem(reading, callback);          // <<<<<<<<<< here
  }
}, function (err) {
  if (err) {
    res.send(Boom.badRequest("Can't sent data"));
  } else {
    res.send({
      statusCode: 200,
      message: 'Data sent'
    });
  }
});
var updateLocation = function updateLocation(reading, callback) {
  request.patch(config.url + '/handhelds/' + reading.id)
    .send({
      //some information
    }).end(callback);                       // <<<<<<<<<< here
};

var updateItem = function updateItem(lectura, callback) {
  request.post(config.url + '/items/' + reading.id + '/event')
    .send({
      //some information
    }).end(callback);                      // <<<<<<<<<< here
};