为rx.js observable缓冲多个订阅者
buffering multiple subscribers to rx.js observable
我有
var subject = new rx.Subject();
var stream = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.subscribe(subject);
return subject;
然后我将主题传递给几个不同的处理程序,这些处理程序将以不同的方式和不同的速度处理事件。
每个处理器中都是
subject.subscribe(async function (x) {
const func = self[x.eventName];
if (func) {
await eventHandlerWrapper(self.handlerName, func, x);
}
})
我有两个问题,a)如果事件来得超级快,处理程序是否会同步处理它们,并以正确的顺序给出我的方式?b)如果不同的处理程序以不同的速度处理事件,它们是否都要等到最慢的处理程序通过后才提供下一个事件?或者他们会按照自己的节奏缓冲和处理?谢谢你,R
首先,主题的创建可以简化如下:
const subject = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.share();
share方法将从流中创建一个Subject。如果您将此主题实例返回给每个订阅者,您将得到相同的行为,并且看起来更好。
a) if the events come in super fast is the handler going to process
them synchronously and in the right order given the way I have it?
事件将按照正确的顺序一个接一个地推入整个链。意思是,通过'fromEvent'进入的事件将被推入整个链,直到您订阅它的点,然后处理下一个值(除非在两者之间有async操作符:))。Ben Lesh在angular connect 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ上解释了这一点(你可以观看整个演讲,但它在min 17左右,他比较了数组和可观察对象)。
b) if the different handlers handle the event at different speeds are
they all going to wait till the slowest handler is through before the
next event is provided? or will they all sort of buffer and handle at
they're own pace?
他们将按照自己的节奏处理这些事件。检查以下示例:
let interval$ = Rx.Observable.interval(1000).share();
interval$.concatMap((val) => {
console.log('called');
return Rx.Observable.of(val).delay(3000)
})
.subscribe((val) => console.log("slow ", val));
interval$.subscribe((val) => console.log("fast ", val));
这里我使用了一个区间可观察对象,我将其转换为一个主题。它会每秒发送一个事件。我有一个订阅,它接受一个值,处理这个值(需要2秒),然后接受下一个值(使用concatMap)。另一个订阅会立即处理它们。如果运行这段代码(此处为jsbin: https://jsbin.com/zekalab/edit?js,console),您将看到它们都以自己的速度处理事件。
所以它们不等待最慢的处理程序,它将被内部缓冲。
如果最慢的处理器比抛出事件的频率慢,那么您所描述的情况可能会出现潜在的危险情况。在这种情况下,您的缓冲区将不断增长,最终您的应用程序将崩溃。这个概念叫做背压。你接收事件的速度比处理它们的速度要快。在这种情况下,您需要在最慢的处理器上使用像'buffer'或'window'这样的操作符来避免这种情况。
相关文章:
- RX受试者使用上一个值添加新发射
- 为什么我得到“;没有方法'indexOf'"在ExtJS4.2中尝试在网格上使用缓冲渲染器时出错
- 如何检查html5视频是否有足够的缓冲,以便在不停止缓冲的情况下播放
- HTML5相机缓冲和延迟(延迟镜像)
- 为什么当订阅两次rx 2.3中的observable时,第二次订阅没有收到任何值
- 无效的非字符串/缓冲区块Node.js
- 在 Node.js 中使用流来缓冲 HTTP 通信
- HTML 5 音频 :是否有在更改缓冲持续时间时触发的事件
- 帖子的 RX JS 聚合
- Rx-subscribe方法如何在不传递上下文的情况下保留上下文
- 乙烯基缓冲液的用途是什么
- HTML5<音频>:点击下一首歌曲时停止加载/缓冲
- 实现缓冲转换流
- RxJS-缓冲多达n个项目并发出这些缓冲区
- Rx如何在Web(客户端)上真正工作
- Rx 去抖运算符与第一个和最后一个
- 使 Flowplayer-Flash 在缓冲时立即播放视频
- 缓冲 html5 视频时禁用亮度更改
- 如何构建一个在上一个 ajax 承诺解析后等待一段时间的 rx 轮询器
- 为rx.js observable缓冲多个订阅者