操作 RxJS 流并发布结果的可观察量的正确方法是什么?
What is the correct way to manipulate RxJS streams and publish an Observable of the results?
我有一个websocket连接,它正在使用ReplaySubect生成内部消息事件。 我处理这些事件并为某些消息添加延迟。 在内部,我使用 publish().refCount() 两次,一次在内部重播主题上,另一次在已发布的输出流上。
内部主体是否应该同时调用"发布"和"refCount"? 我使用"发布",因为我有多个订阅者,但我不完全确定何时使用"refCount"。
可以只处理内部主题吗? 这会清理其他一切吗?
订阅"eventStream"的人应该获得最新版本,但连接不应该等待任何订阅者
示例代码:
function Connection(...) {
var messageSubject = new Rx.ReplaySubject(1);
var messageStream = messageSubject.publish().refCount();
// please ignore that we're not using rxdom's websocket.
var ws = new WebSocket(...);
ws.onmessage = function(messageEvent) {
var message = JSON.parse(messageEvent.data);
messageSubject.onNext(message);
}
ws.onclose = function(closeEvent) {
messageSubject.dispose(); // is this all I need to dispose?
}
var immediateRevisions = messageStream
.filter((e) => e[0] === "immediate")
.map((e) => ["revision", e[1]]);
var delayedRevisions = messageStream
.filter((e) => e[0] === "delayed")
.map((e) => ["revision", e[1]]).delay(1000);
var eventStream = Rx.Observable.merge(immediateRevisions, delayedRevisions).publish().refCount();
Object.defineProperties(this, {
"eventStream": { get: function() { return eventStream; }},
});
}
// using the eventStream
var cxn = new Connection(...)
cxn.eventStream.subscribe((e) => {
if (e[0] === "revision") {
// ...
}
});
发布和 refCount 基本上是 shareReplay
在 RxJS4 中所做的。 老实说,如果你真的想保证即使订阅计数低于 1,你也应该让你的可观察对象是"温暖的",然后使用 ReplaySubject 作为订阅者。 例如:
const wsStream = Observable.create(observer => {
ws.onmessage = message => observer.next(message);
ws.onclose = () => observer.complete();
});
const latestWsMessages = new ReplaySubject(1);
wsStream.subscribe(latestWsMessages);
请务必查看可观察量的工作原理:创建可观察量后,通常每个订阅者都会调用订阅(冷),但在这种情况下,您可能需要一个热可观察量,以便您有多个订阅者共享一个订阅。请参阅此处的 Andre 视频和有关创建可观察量的 RxJS 文档以获取更多信息。
此外,尽管类可能很有用,但在这种情况下,您只需要一个makeWebsocketObservable(WebsocketConfig): Observable<WebsocketEvent>
函数
相关文章:
- 这是什么 ==- javascript 运算符
- 我的单元测试选项是什么
- 打破承诺链的好方法是什么
- 在AngularJS应用程序中使用封装指令和路由的推荐方式是什么
- Javascript中的空白是什么
- 是什么让一个“;Uncaught RangeError:超过了最大调用堆栈大小“;错误(Chrome,在其他浏览器中显示
- 在JavaScript中拆分日期字符串的更好方法是什么
- 将jQuery.ech()方法转换为本地JavaScript抽象的最佳方法是什么
- 处理浮点错误的最佳方法是什么
- javascript导入的最佳实践是什么
- 基于窗口宽度jquery的函数的替代方法是什么
- 淘汰js可观察扩展的执行顺序是什么
- 在jQuery数据表中使用AngularJS观察者的方法是什么
- 突变观察者收到的突变记录的顺序是什么?
- Knockout的可写计算在AngularJS中可观察的模拟是什么?
- 操作 RxJS 流并发布结果的可观察量的正确方法是什么?
- ko是什么?在这种情况下观察到的
- 在响应式表达式中,是什么启动了一个热门的可观察序列?
- KnockoutJS的select会以一种意想不到的方式影响可观察对象,这取决于select是什么时候创建的
- 识别一个可观察对象的更新者的最好方法是什么?