RxJS:如何让一个Observer处理多个Observable
RxJS: How to have one Observer process multiple Observables?
我正在使用一个框架来调用我实现的函数。我希望将此函数的参数转换为Observable,并通过一系列Observators发送。我想我可以用一个Subject来做这个,但它的行为并不像我预期的那样。
为了澄清,我有如下代码。我原以为下面的Option 1
会起作用,但到目前为止,我还是选择了Option 2
,它看起来一点也不习惯。
var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// Option 1: I thought this would work, but it doesn't
// eventSource.subscribe(eventSubject);
// Option 2: This does work, but its obviously clunky
eventSource.subscribe(
function(event) {
log.debug("sending to subject");
eventSubject.onNext(event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSource onCompleted');
}
);
}
正如Brandon已经解释的那样,将eventSubject订阅到另一个可观察对象意味着将eventSubjects onNext、onError和onComplete订阅到那个可观察对象onNext、onError和onComplete。从您的示例来看,您似乎只想订阅onNext。
一旦第一个eventSource完成/errors,您的主题就完成/erros-您的eventSubject正确地忽略了后续eventSoures对其触发的任何onNext/onError。
有多种方式可以只订阅任何事件的onNext来源:
-
仅手动订阅onNext。
resultSource = eventSubject .map(processEvent); eventSource.subscribe( function(event) { eventSubject.onNext(event); }, function(error) { // don't subscribe to onError }, function() { // don't subscribe to onComplete } );
-
请使用仅处理订阅事件源onNext/onError的运算符。这就是布兰登的建议请记住,这也订阅了eventSources-onError,在您的示例中,您似乎不想要它
resultSource = eventSubject .mergeAll() .map(processEvent); eventSubject.onNext(eventSource);
-
对于onError/onComplete上的eventSources,请使用不调用eventSubject onError/onComplete的观察器。您可以简单地将eventSubjectsonComplete覆盖为一个肮脏的破解,但最好创建一个新的观察者。
resultSource = eventSubject .map(processEvent); var eventObserver = Rx.Observer.create( function (event) { eventSubject.onNext(event); } ); eventSubject.subscribe(eventObserver);
这只是当你将整个Subject
订阅到一个可观察对象时,你最终订阅了该可观察对象的onComplete
事件。这意味着当可观察到的完成时,它将完成你的主题。因此,当你得到下一组事件时,它们什么也不做,因为主题已经完成了。
一个解决方案正是你所做的。只需为主题订阅onNext
事件。
第二种解决方案,也可以说是更"类似rx"的解决方案,是将传入的数据视为可观测的可观测数据流,并使用mergeAll:将此可观测流压平
var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
.mergeAll() // subscribe to each inner observable as it arrives
.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// send a new inner observable sequence
// into the subject
eventSubject.onNext(eventSource);
}
更新:这是一个运行的示例
- 创建一个类似链接的按钮,并通过Javascript函数打开一个新的弹出窗口
- jQuery:循环一个具有不同超时值的循环
- 当包含另一个asp文件时,是否也包含所有引用的样式和脚本页面
- 从javascript创建一个列表
- 节点导出返回一个空对象
- 使用clickToggle并在单击另一个元素时关闭元素
- 我可以在json对象中添加一个函数吗
- 使用javascript将动态表从一个html页面打印到另一个html页
- 将jsp文件下拉列表中的选定项分配给一个java变量(比如String selection)
- 表追加而不附加最后一个元素
- 我如何找到一个句子中的所有空格并替换忽略它们
- D3在一个调用中绘制不同的SVG形状,没有可见性
- 如何在android中使用phonegap将文件从一个文件夹移动/复制到另一个文件夹
- 使用类从一个标记中双击事件
- Javascript,访问一个主要对象模块模式中的每个对象
- 如果使用 lodash 将属性存在于另一个对象中,则向对象添加属性
- 如何在elfinder插件(一个文件管理器插件)上获得上传前事件
- 我应该如何从xml文件构建一个javascript页面
- RxJS:如何让一个Observer处理多个Observable
- 如何将一个Observable解析为另一个Observer?-rxjs