异步库是否有任何用于处理管道的控制流

Does the async library have any control flow for handling pipelines?

本文关键字:处理 管道 控制流 用于 任何 是否 异步      更新时间:2023-09-26

我正在研究异步库,但似乎找不到处理管道的控制流。我只是想知道我是不是错过了什么。

我想实现一个管道。示例:

let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
var pipelineResult = pipelineRunner.run(pipeline, 'sample input', (error, result) => {});

解释:调用了一系列函数。每个函数接收一个input和一个next函数。每个函数处理input并将其作为参数传递给next函数。作为管道执行的结果,我得到了已处理的input,或者,如果任何函数调用next时出错,管道将停止并调用回调。

我想这是一个非常常见的用例,所以我认为async可以做到,但我找不到它。如果你知道有其他库可以达到这样的结果,那也是可以接受的。

您正在查找async.waterfall函数。

或者,如果您需要一个可以将初始input传递给的函数,则可以使用多个参数应用asyc.seqasync.compose

我最终自己实现了它,尽管正如@Bergi刚刚展示的那样,async确实支持它。

/**
 * Runs asynchronous pipelines
 */
class PipelineRunner {
    /**
     * Runs the given pipeline
     * @param pipeline - The array of functions that should be executed (middleware)
     * @param middlewareArgs - The array of arguments that should be passed in to the middleware
     * @param input
     * @param next
     */
    run(pipeline, middlewareArgs, input, next) {
        if (!pipeline) throw Error('''pipeline'' should be truthy');
        if (!context) throw Error('''context'' should be truthy');
        if (!input) throw Error('''input'' should be truthy');
        if (!next) throw Error('''next'' should be truthy');
        if (!pipeline.length) throw Error('''pipeline.length'' should be truthy');
        let index = 0;
        // the link function "binds" every function in the pipeline array together
        let link = (error, result) => {
            if (error) {
                next(error);
                return;
            }
            let nextIndex = index++;
            if (nextIndex < pipeline.length) {
                let args = [result].concat(middlewareArgs).concat(link);
                pipeline[nextIndex].apply(null, args);
            }
            else {
                next(null, result);
            }
        };
        let args = [input].concat(middlewareArgs).concat(link);
        pipeline[index++].apply(null, args);
    }
}
export default new PipelineRunner();

单元测试

import chai from 'chai';
import pipelineRunner from '../src/server/lib/pipelines/pipelineRunner';
let assert = chai.assert;

describe('PipelineRunner', () => {
    describe('run', function() {
        it('Happy path', () => {
            let pipeline = [];
            pipeline.push((input, next) => { next(null, input); });
            pipeline.push((input, next) => { next(null, input); });
            pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
                assert.strictEqual(result, "happy");
            });
        });
        it('Happy path - with arguments', () => {
            let pipeline = [];
            pipeline.push((input, someArgument, next) => {
                assert.strictEqual(someArgument, 'something that should be passed in');
                next(null, input);
            });
            pipeline.push((input, someArgument, next) => { next(null, input); });
            pipelineRunner.run(pipeline, ['something that should be passed in'], 'happy', (error, result) => {
                assert.strictEqual(result, "happy");
            });
        });
        it('When something goes wrong', () => {
            let pipeline = [];
            pipeline.push((input, next) => { next(null, input); });
            pipeline.push((input, next) => { next('something went wrong'); });
            pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
                assert.strictEqual(error, 'something went wrong');
            });
        });
    });
});