如何正确构建异步程序以确保正确的结果

How to correct structure an asynchronous program to ensure correct results?

本文关键字:结果 确保 何正确 构建 异步 程序      更新时间:2023-09-26

我有一个nodejs程序,它请求一系列XML文件,解析它们,然后将输出放在一个数组中,该数组作为CSV文件写入磁盘。

该程序大部分工作,但偶尔文件在数组中的顺序错误

我希望结果的顺序与URL的顺序相同。URL 存储在数组中,因此当我获取 XML 文件时,我会检查源数组中 URL 的索引,并将结果插入目标 URL 中的同一索引处。

谁能看到允许结果以错误顺序结束的缺陷?

addResult = function (url, value, timestamp) {
        data[config.sources.indexOf(url)] = {
            value : value,
            timestamp : timestamp,
            url : url
        };
        numResults++;
        if (numResults === config.sources.length) { //once all results are in build the output file
            createOutputData();
        }
    }
fs.readFile("config.json", function (fileError, data) {
    var eachSource, processResponse = function (responseError, response, body) {
        if (responseError) {
            console.log(responseError);
        } else {
            parseXML(body, {
                explicitArray : false
            }, function (xmlError, result) {
                if (xmlError) {
                    console.log(xmlError);
                }
                addResult(response.request.uri.href, result.Hilltop.Measurement.Data.E.I1, moment(result.Hilltop.Measurement.Data.E.T));
            });
        }
    };
    if (fileError) {
        console.log(fileError);
    } else {
        config = JSON.parse(data); //read in config file
        for (eachSource = 0; eachSource < config.sources.length; eachSource++) {
            config.sources[eachSource] = config.sources[eachSource].replace(/ /g, "%20"); //replace all %20 with " " 
            request(config.sources[eachSource], processResponse); //request each source
        }
    }
});
var writeOutputData, createOutputData, numResults = 0, data = [], eachDataPoint, multipliedFlow = 0;
writeOutputData = function (output, attempts) {
    csv.writeToPath(config.outputFile, [ output ], {
        headers : false
    }).on("finish", function () {
        console.log("successfully wrote data to: ", config.outputFile);
    }).on("error", function (err) { //on write error
        console.log(err);
        if (attempts < 2) { //if there has been less than 3 attempts try writing again after 500ms
            setTimeout(function () {
                writeOutputData(output, attempts + 1);
            }, 500);
        }
    });
};
createOutputData = function () {
    var csvTimestamp, output = [];
    if (config.hasOwnProperty("timestampFromSource")) {
        csvTimestamp = data.filter(function (a) {
            return a.url === config.sources[config.timestampFromSource];
        })[0].timestamp.format("HHmm");
        console.log("timestamp from source [" + config.timestampFromSource + "]:", csvTimestamp);
    } else {
        csvTimestamp = data.sort(function (a, b) { //sort results from oldest to newest
            return a.timestamp.unix() - b.timestamp.unix();
        });
        csvTimestamp = csvTimestamp[0].timestamp.format("HHmm");//use the oldest date for the timestamp
        console.log("timestamp from oldest source:", csvTimestamp);
    }
    //build array to represent data to be written
    output.push(config.plDestVar); //pl var head address first
    output.push(config.sources.length + 1); //number if vars to import
    output.push(csvTimestamp); //the date of the data 
    for (eachDataPoint = 0; eachDataPoint < data.length; eachDataPoint++) { //add each data point
        if (config.flowMultiplier) {
            multipliedFlow = Math.round(data[eachDataPoint].value * config.flowMultiplier); //round to 1dp and remove decimal by *10
        } else {
            multipliedFlow = Math.round(data[eachDataPoint].value * 10); //round to 1dp and remove decimal by *10
        }
        if (multipliedFlow > 32766) {
            multipliedFlow = 32766;
        } else if (multipliedFlow < 0) {
            multipliedFlow = 0;
        }
        output.push(multipliedFlow);
    }
    console.log(output);
    writeOutputData(output, 0); //write the results, 0 is signalling first attempt
};
我认为

索引代码的网址需要调试。下面是一个示例,该示例使用在 for 循环中预填充了键的对象。

'

var http = require('http');
var fs = require("fs");
var allRequestsComplete = function(results){
    console.log("All Requests Complete");
    console.log(results);
};
fs.readFile("urls.json", function (fileError, data) {
    var responseCount = 0;
     if (fileError) {
        console.log(fileError);
    } else {
        var allResponses = {};
        config = JSON.parse(data); //read in config file
        var requestComplete = function(url, fileData){
            responseCount++;
            allResponses[url] = fileData;
            if(responseCount===config.sources.length){
                allRequestsComplete(allResponses);
            }
        }; 
        for (var eachSource = 0; eachSource < config.sources.length; eachSource++) {
            (function(url){     
                allResponses[url] = "Waiting";
                http.get({host: url,path: "/"}, function(response) {
                    response.on('error', function (chunk) {
                        requestComplete(url, "ERROR");
                    });
                    var str = ''
                    response.on('data', function (chunk) {
                        str += chunk;
                    });
                    response.on('end', function () {
                        requestComplete(url, str);
                    });
                });
            }(config.sources[eachSource].replace(/ /g, "%20").replace("http://", "")));
        }
    }
});

'

我同意@Kevin B 的观点,您不能假设异步回调将以您发送它们的相同顺序返回。但是,您可以通过在 processResponse 上添加索引函数来确保顺序。

假设您将以下内容添加到添加结果

addResult = function (index, url, value, timestamp) {
    data[index] = {
        value : value,
        timestamp : timestamp,
        url : url
    };
    numResults++;
    if (numResults === config.sources.length) { //once all results are in build the output file
        createOutputData();
    }
}

并使用额外的函数来调用您的请求

function doRequest(index, url) {
    request(url, function(responseError, response, body) {
        if (responseError) {
            console.log(responseError);
        } else {
            parseXML(body, {
                    explicitArray : false
                }, function (xmlError, result) {
                    if (xmlError) {
                        console.log(xmlError);
                    }
                    addResult(index, response.request.uri.href, result.Hilltop.Measurement.Data.E.I1, moment(result.Hilltop.Measurement.Data.E.T));
            });
        }
    });
}

然后,您还可以将循环更改为:

    for (eachSource = 0; eachSource < config.sources.length; eachSource++) {
        config.sources[eachSource] = config.sources[eachSource].replace(/ /g, "%20"); //replace all %20 with " " 
        doRequest(eachSource, config.sources[eachSource]); //request each source
    }