如何使用事件和承诺控制程序流

How can I control program flow using events and promises?

本文关键字:控制 程序 承诺 何使用 事件      更新时间:2023-09-26

我有一个这样的类:

import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';
class MyClass extends EventEmitter {
    constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.connect();
    }
    connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', this.handle.bind(this));
    }
    handle(data) {
        this.socket.on('data', data => {
        });
    }
    send(data) {
        this.socket.write(data);
    }
}

我如何把send方法变成一个承诺,它从套接字的data事件返回一个值?只有当数据被发送给服务器时,服务器才会发送回数据,而不是一个容易被抑制的连接消息。

我试过这样做:

handle(data) {
    this.socket.on('data', data => {
        return this.socket.resolve(data);
    });
    this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
    return new Promise((resolve, reject) => {
        this.socket.resolve = resolve;
        this.socket.reject = reject;
        this.socket.write(data);
    });
}

显然这是行不通的,因为resolve/reject在链接和/或并行调用send多次时会相互覆盖。

也有并行调用send两次的问题,它解决哪个响应先返回。

我目前有一个使用队列和延迟的实现,但由于队列不断被检查,因此感觉很混乱。

我希望能够做到以下几点:

let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
    return c.send('bar', response.param);
    //`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
    console.log(response);
}).catch(error => console.log(error));

只是补充一下,我对接收的数据没有任何控制,这意味着它不能在流之外被修改。

Edit:所以这似乎是不可能的,因为TCP没有请求-响应流。这是如何实现的,仍然使用承诺,但要么使用单执行(一次一个请求)承诺链或队列。

我将问题简化到最小,并使其在浏览器中可运行:

  1. Socket类被模拟
  2. 删除EventEmitter的端口、主机和继承信息。

解决方案的工作原理是将新请求附加到承诺链中,但在任何给定的时间点允许最多一个打开/未应答的请求。.send每次被调用时都会返回一个新的promise,该类负责所有内部同步。因此,可以多次调用.send,并保证请求处理的正确顺序(FIFO)。我添加的另一个功能是修剪承诺链,如果没有挂起的请求。


注意我完全省略了错误处理,但无论如何都应该针对您的特定用例进行调整。


class SocketMock {
  constructor(){
    this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); 
    this.listeners = {
  //  'error' : [],
    'data' : []
    }
  }
  send(data){
    console.log(`SENDING DATA: ${data}`);
    var response = `SERVER RESPONSE TO: ${data}`;
    setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),               
               Math.random()*2000 + 250); 
  }
  on(event, callback){
    this.listeners[event].push(callback); 
  }
}
class SingleRequestCoordinator {
    constructor() {
        this._openRequests = 0; 
        this.socket = new SocketMock();
        this._promiseChain = this.socket
            .connected.then( () => console.log('SOCKET CONNECTED'));
      this.socket.on('data', (data) => {
        this._openRequests -= 1;
        console.log(this._openRequests);
        if(this._openRequests === 0){
          console.log('NO PENDING REQUEST --- trimming the chain');
          this._promiseChain = this.socket.connected
        }
        this._deferred.resolve(data);
      });
    }
    send(data) {
      this._openRequests += 1;
      this._promiseChain = this._promiseChain
        .then(() => {
            this._deferred = Promise.defer();
            this.socket.send(data);
            return this._deferred.promise;
        });
      return this._promiseChain;
    }
}
var sender = new SingleRequestCoordinator();
sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send('data-4')
    .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);

如果您的send()调用相互混淆,您应该将其保存到缓存中。为了确保接收到的消息与发送的消息匹配,您应该为每条消息分配一些唯一的id到有效负载中。

所以你的消息发送器看起来像这样

class MyClass extends EventEmitter {
  constructor() {
    // [redacted]
    this.messages = new Map();
  }
  handle(data) {
    this.socket.on('data', data => {
       this.messages.get(data.id)(data);
       this.messages.delete(data.id);
    });
  }
  send(data) {
    return return new Promise((resolve, reject) => {
        this.messages.set(data.id, resolve);
        this.socket.write(data);
    });
  }
}

这段代码对消息顺序不敏感,你会得到你想要的API

socket.write(data[, encoding][, callback])接受回调。你可以在这个回调中拒绝或解析。

class MyClass extends EventEmitter {
  constructor(host = 'localhost', port = 10011) {
    super(EventEmitter);
    this.host = host;
    this.port = port;
    this.socket = null;
    this.requests = null;
    this.connect();
  }
  connect() {
    this.socket = net.connect(this.port, this.host);
    this.socket.on('connect', () => {
      this.requests = [];
      this.socket.on('data', this.handle.bind(this));
      this.socket.on('error', this.error.bind(this));
    });
  }
  handle(data) {
    var [request, resolve, reject] = this.requests.pop();
    // I'm not sure what will happen with the destructuring if requests is empty
    if(resolve) {
      resolve(data);
    }
  }
  error(error) {
    var [request, resolve, reject] = this.requests.pop();
    if(reject) {
      reject(error);
    }
  }
  send(data) {
    return new Promise((resolve, reject) => {
      if(this.requests === null) {
        return reject('Not connected');
      }
      this.requests.push([data, resolve, reject]);
      this.socket.write(data);
    });
  }
}

未测试,因此不确定方法签名,但这是基本思想。

这里假设每个请求将有一个handleerror事件。

我想得越多,这似乎不可能没有额外的信息在您的应用程序数据,如数据包编号匹配一个请求的响应。

它现在实现的方式(也是你的问题的方式),它甚至不确定一个答案将完全匹配一个handle事件。