如何使用事件和承诺控制程序流
How can I control program flow using events and promises?
我有一个这样的类:
import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', this.handle.bind(this));
}
handle(data) {
this.socket.on('data', data => {
});
}
send(data) {
this.socket.write(data);
}
}
我如何把send
方法变成一个承诺,它从套接字的data
事件返回一个值?只有当数据被发送给服务器时,服务器才会发送回数据,而不是一个容易被抑制的连接消息。
我试过这样做:
handle(data) {
this.socket.on('data', data => {
return this.socket.resolve(data);
});
this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
return new Promise((resolve, reject) => {
this.socket.resolve = resolve;
this.socket.reject = reject;
this.socket.write(data);
});
}
显然这是行不通的,因为resolve
/reject
在链接和/或并行调用send
多次时会相互覆盖。
也有并行调用send
两次的问题,它解决哪个响应先返回。
我目前有一个使用队列和延迟的实现,但由于队列不断被检查,因此感觉很混乱。
我希望能够做到以下几点:
let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
return c.send('bar', response.param);
//`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
console.log(response);
}).catch(error => console.log(error));
只是补充一下,我对接收的数据没有任何控制,这意味着它不能在流之外被修改。
Edit:所以这似乎是不可能的,因为TCP没有请求-响应流。这是如何实现的,仍然使用承诺,但要么使用单执行(一次一个请求)承诺链或队列。
我将问题简化到最小,并使其在浏览器中可运行:
- Socket类被模拟
- 删除
EventEmitter
的端口、主机和继承信息。
解决方案的工作原理是将新请求附加到承诺链中,但在任何给定的时间点允许最多一个打开/未应答的请求。.send
每次被调用时都会返回一个新的promise,该类负责所有内部同步。因此,可以多次调用.send
,并保证请求处理的正确顺序(FIFO)。我添加的另一个功能是修剪承诺链,如果没有挂起的请求。
注意我完全省略了错误处理,但无论如何都应该针对您的特定用例进行调整。
class SocketMock {
constructor(){
this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) );
this.listeners = {
// 'error' : [],
'data' : []
}
}
send(data){
console.log(`SENDING DATA: ${data}`);
var response = `SERVER RESPONSE TO: ${data}`;
setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),
Math.random()*2000 + 250);
}
on(event, callback){
this.listeners[event].push(callback);
}
}
class SingleRequestCoordinator {
constructor() {
this._openRequests = 0;
this.socket = new SocketMock();
this._promiseChain = this.socket
.connected.then( () => console.log('SOCKET CONNECTED'));
this.socket.on('data', (data) => {
this._openRequests -= 1;
console.log(this._openRequests);
if(this._openRequests === 0){
console.log('NO PENDING REQUEST --- trimming the chain');
this._promiseChain = this.socket.connected
}
this._deferred.resolve(data);
});
}
send(data) {
this._openRequests += 1;
this._promiseChain = this._promiseChain
.then(() => {
this._deferred = Promise.defer();
this.socket.send(data);
return this._deferred.promise;
});
return this._promiseChain;
}
}
var sender = new SingleRequestCoordinator();
sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send('data-4')
.then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
如果您的send()
调用相互混淆,您应该将其保存到缓存中。为了确保接收到的消息与发送的消息匹配,您应该为每条消息分配一些唯一的id
到有效负载中。
所以你的消息发送器看起来像这样
class MyClass extends EventEmitter {
constructor() {
// [redacted]
this.messages = new Map();
}
handle(data) {
this.socket.on('data', data => {
this.messages.get(data.id)(data);
this.messages.delete(data.id);
});
}
send(data) {
return return new Promise((resolve, reject) => {
this.messages.set(data.id, resolve);
this.socket.write(data);
});
}
}
这段代码对消息顺序不敏感,你会得到你想要的API
socket.write(data[, encoding][, callback])
接受回调。你可以在这个回调中拒绝或解析。
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.requests = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', () => {
this.requests = [];
this.socket.on('data', this.handle.bind(this));
this.socket.on('error', this.error.bind(this));
});
}
handle(data) {
var [request, resolve, reject] = this.requests.pop();
// I'm not sure what will happen with the destructuring if requests is empty
if(resolve) {
resolve(data);
}
}
error(error) {
var [request, resolve, reject] = this.requests.pop();
if(reject) {
reject(error);
}
}
send(data) {
return new Promise((resolve, reject) => {
if(this.requests === null) {
return reject('Not connected');
}
this.requests.push([data, resolve, reject]);
this.socket.write(data);
});
}
}
未测试,因此不确定方法签名,但这是基本思想。
这里假设每个请求将有一个handle
或error
事件。
我想得越多,这似乎不可能没有额外的信息在您的应用程序数据,如数据包编号匹配一个请求的响应。
它现在实现的方式(也是你的问题的方式),它甚至不确定一个答案将完全匹配一个handle
事件。
- 三星智能电视应用程序;Brightcove示例应用程序远程控制问题
- 应用程序在“关闭”之后停止工作(控制台中没有错误);咕哝的构造”;
- 量角器 iOS 自动化:茉莉花规范超时.重置 Web 驱动程序控制流
- 控制node.js应用程序's通过外部网站举办的活动
- 如何在Java web应用程序中自动控制静态文件(css、js、image)的版本
- 使用JavaScript控制iOS网络应用程序中列表的垂直滚动和水平滑动
- 了解 Node.js 应用程序中的控制流
- 模态对话框加载 jquery 在控制台中执行,但不从应用程序加载.js
- 我的 webgl 着色器程序有什么问题:它在控制台中没有抛出任何错误
- 如何在JMVC/Can.js中处理应用程序的版本控制
- Chrome Extension - Javascript - 使用扩展程序控制 Youtube 视频
- 如何在Ratchet's的聊天应用程序,而不是控制台上的文本框
- 在Raspberry Pi上从网络控制c程序
- 使AJAX应用程序在没有后端控制的情况下可爬网
- 公共Web应用程序的访问控制或加密
- Ionic应用程序和服务器帖子:访问控制允许来源
- Git:如何在目录结构方面分离后端Rest Api和前端应用程序的版本控制
- 推特:仅应用程序身份验证错误访问控制允许原点不允许原点为null
- 三星智能电视应用程序无法控制音量
- 控制何时在ARI/Stasis应用程序中调用函数