RxJS:如何让一个Observer处理多个Observable

RxJS: How to have one Observer process multiple Observables?

本文关键字:一个 Observer 处理 Observable RxJS      更新时间:2023-09-26

我正在使用一个框架来调用我实现的函数。我希望将此函数的参数转换为Observable,并通过一系列Observators发送。我想我可以用一个Subject来做这个,但它的行为并不像我预期的那样。

为了澄清,我有如下代码。我原以为下面的Option 1会起作用,但到目前为止,我还是选择了Option 2,它看起来一点也不习惯。

var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);
// The framework calls this method
function onEvent(eventArray) {
   var eventSource = Rx.Observable.from(eventArray);
   // Option 1: I thought this would work, but it doesn't
   // eventSource.subscribe(eventSubject);
   // Option 2: This does work, but its obviously clunky
  eventSource.subscribe(
      function(event) {
          log.debug("sending to subject");
          eventSubject.onNext(event);
      },
      function(e) {
          log.error(e);
      },
      function() {
          console.log('eventSource onCompleted');
      }
  );
}

正如Brandon已经解释的那样,将eventSubject订阅到另一个可观察对象意味着将eventSubjects onNext、onError和onComplete订阅到那个可观察对象onNext、onError和onComplete。从您的示例来看,您似乎只想订阅onNext。

一旦第一个eventSource完成/errors,您的主题就完成/erros-您的eventSubject正确地忽略了后续eventSoures对其触发的任何onNext/onError。

有多种方式可以只订阅任何事件的onNext来源:

  1. 仅手动订阅onNext。

    resultSource = eventSubject
      .map(processEvent);
    eventSource.subscribe(
        function(event) {
            eventSubject.onNext(event);
        },
        function(error) {
            // don't subscribe to onError
        },
        function() {
            // don't subscribe to onComplete
        }
    );
    
  2. 请使用仅处理订阅事件源onNext/onError的运算符。这就是布兰登的建议请记住,这也订阅了eventSources-onError,在您的示例中,您似乎不想要它

    resultSource = eventSubject
      .mergeAll()
      .map(processEvent);
    eventSubject.onNext(eventSource);
    
  3. 对于onError/onComplete上的eventSources,请使用不调用eventSubject onError/onComplete的观察器。您可以简单地将eventSubjectsonComplete覆盖为一个肮脏的破解,但最好创建一个新的观察者。

    resultSource = eventSubject
      .map(processEvent);
    var eventObserver = Rx.Observer.create(
      function (event) {
        eventSubject.onNext(event);
      }
    );
    eventSubject.subscribe(eventObserver);
    

这只是当你将整个Subject订阅到一个可观察对象时,你最终订阅了该可观察对象的onComplete事件。这意味着当可观察到的完成时,它将完成你的主题。因此,当你得到下一组事件时,它们什么也不做,因为主题已经完成了。

一个解决方案正是你所做的。只需为主题订阅onNext事件。

第二种解决方案,也可以说是更"类似rx"的解决方案,是将传入的数据视为可观测的可观测数据流,并使用mergeAll:将此可观测流压平

var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
  .mergeAll() // subscribe to each inner observable as it arrives
  .map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);
// The framework calls this method
function onEvent(eventArray) {
   var eventSource = Rx.Observable.from(eventArray);
   // send a new inner observable sequence
   // into the subject
   eventSubject.onNext(eventSource);
}

更新:这是一个运行的示例