为rx.js observable缓冲多个订阅者

buffering multiple subscribers to rx.js observable

本文关键字:缓冲 rx js observable      更新时间:2023-09-26

我有

  var subject = new rx.Subject();
  var stream =     rx.Observable.fromEvent(blah, 'event')
                  .filter(blah)
                  .map(blah)
                  .subscribe(subject);
                return subject;

然后我将主题传递给几个不同的处理程序,这些处理程序将以不同的方式和不同的速度处理事件。
每个处理器中都是

subject.subscribe(async function (x) {
        const func = self[x.eventName];
        if (func) {
          await eventHandlerWrapper(self.handlerName, func, x);
        }
      })
我有两个问题,a)如果事件来得超级快,处理程序是否会同步处理它们,并以正确的顺序给出我的方式?b)如果不同的处理程序以不同的速度处理事件,它们是否都要等到最慢的处理程序通过后才提供下一个事件?或者他们会按照自己的节奏缓冲和处理?

谢谢你,R

首先,主题的创建可以简化如下:

const subject = rx.Observable.fromEvent(blah, 'event')
              .filter(blah)
              .map(blah)
              .share();
share方法将从流中创建一个Subject。如果您将此主题实例返回给每个订阅者,您将得到相同的行为,并且看起来更好。
 a) if the events come in super fast is the handler going to process
 them synchronously and in the right order given the way I have it?

事件将按照正确的顺序一个接一个地推入整个链。意思是,通过'fromEvent'进入的事件将被推入整个链,直到您订阅它的点,然后处理下一个值(除非在两者之间有async操作符:))。Ben Lesh在angular connect 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ上解释了这一点(你可以观看整个演讲,但它在min 17左右,他比较了数组和可观察对象)。

b) if the different handlers handle the event at different speeds are 
they all going to wait till the slowest handler is through before the    
next event is provided? or will they all sort of buffer and handle at  
they're own pace?

他们将按照自己的节奏处理这些事件。检查以下示例:

let interval$ = Rx.Observable.interval(1000).share();
interval$.concatMap((val) => {
    console.log('called');
    return Rx.Observable.of(val).delay(3000)
  })
  .subscribe((val) => console.log("slow ", val));
interval$.subscribe((val) => console.log("fast ", val));

这里我使用了一个区间可观察对象,我将其转换为一个主题。它会每秒发送一个事件。我有一个订阅,它接受一个值,处理这个值(需要2秒),然后接受下一个值(使用concatMap)。另一个订阅会立即处理它们。如果运行这段代码(此处为jsbin: https://jsbin.com/zekalab/edit?js,console),您将看到它们都以自己的速度处理事件。

所以它们不等待最慢的处理程序,它将被内部缓冲。

如果最慢的处理器比抛出事件的频率慢,那么您所描述的情况可能会出现潜在的危险情况。在这种情况下,您的缓冲区将不断增长,最终您的应用程序将崩溃。这个概念叫做背压。你接收事件的速度比处理它们的速度要快。在这种情况下,您需要在最慢的处理器上使用像'buffer'或'window'这样的操作符来避免这种情况。