操作 RxJS 流并发布结果的可观察量的正确方法是什么?

What is the correct way to manipulate RxJS streams and publish an Observable of the results?

本文关键字:观察 是什么 方法 并发 RxJS 布结果 结果 操作      更新时间:2023-09-26

我有一个websocket连接,它正在使用ReplaySubect生成内部消息事件。 我处理这些事件并为某些消息添加延迟。 在内部,我使用 publish().refCount() 两次,一次在内部重播主题上,另一次在已发布的输出流上。

内部主体是否应该同时调用"发布"和"refCount"? 我使用"发布",因为我有多个订阅者,但我不完全确定何时使用"refCount"。

可以只处理内部主题吗? 这会清理其他一切吗?

订阅"eventStream"的人应该获得最新版本,但连接不应该等待任何订阅者

示例代码:

function Connection(...) {
    var messageSubject = new Rx.ReplaySubject(1);
    var messageStream = messageSubject.publish().refCount();

    // please ignore that we're not using rxdom's websocket.
    var ws = new WebSocket(...);
    ws.onmessage = function(messageEvent) {
      var message = JSON.parse(messageEvent.data);
      messageSubject.onNext(message);
    }
    ws.onclose = function(closeEvent) {
      messageSubject.dispose();  // is this all I need to dispose?
    }

    var immediateRevisions = messageStream
        .filter((e) => e[0] === "immediate")
        .map((e) => ["revision", e[1]]);
    var delayedRevisions = messageStream
        .filter((e) => e[0] === "delayed")
        .map((e) => ["revision", e[1]]).delay(1000);
    var eventStream = Rx.Observable.merge(immediateRevisions, delayedRevisions).publish().refCount();
    Object.defineProperties(this, {
      "eventStream": { get: function() { return eventStream; }},
    });
}

// using the eventStream
var cxn = new Connection(...)
cxn.eventStream.subscribe((e) => {
    if (e[0] === "revision") {
        // ...
    }
});

发布和 refCount 基本上是 shareReplay 在 RxJS4 中所做的。 老实说,如果你真的想保证即使订阅计数低于 1,你也应该让你的可观察对象是"温暖的",然后使用 ReplaySubject 作为订阅者。 例如:

const wsStream = Observable.create(observer => {
  ws.onmessage = message => observer.next(message);
  ws.onclose = () => observer.complete();
});
const latestWsMessages = new ReplaySubject(1);
wsStream.subscribe(latestWsMessages);
请务必查看可观察量

的工作原理:创建可观察量后,通常每个订阅者都会调用订阅(冷),但在这种情况下,您可能需要一个热可观察量,以便您有多个订阅者共享一个订阅。请参阅此处的 Andre 视频和有关创建可观察量的 RxJS 文档以获取更多信息。

此外,尽管类可能很有用,但在这种情况下,您只需要一个makeWebsocketObservable(WebsocketConfig): Observable<WebsocketEvent>函数