主题和弹性逻辑工作流
Subjects and resilience logic workflow
当与弹性运算符(即retry和retryWhen)一起使用时,我想更好地了解受试者的预期行为。
以下代码示例与它们的JSBin对应代码(在示例链接中找到)略有不同,因为我使用了箭头函数和类型以便于使用,这是使用版本4.0.0-4.0.7
我预期的弹性行为可以用下面的例子来表达:
Rx.Observable
.interval(1000)
.flatMap( (count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.retry()
.take(5);
Output
// 0
// 1
// 2
// 3
// 0 <-- Retry means we start again from scratch (expected)
到目前为止,一切都是一致的,也就是说,在第四次通知发生错误后,整个流从头开始(无状态体系结构的胜利)。
现在,如果我们添加一个多播运营商,并在添加基础Subject(在我的情况下是一个缓冲区为1的ReplaySubject)时,我会感到头疼,例如:
const consumer : Rx.Observable<number> = Rx.Observable
.interval(1000)
.flatMap( (count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */
.retry()
.take(5);
const firstSubscriber : Rx.Disposable = consumer.subscribe( (next:number) => {
console.log('first subscriber: ' + next);
});
setTimeout(() => {
firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */
const secondSubscriber : Rx.Disposable = consumer.subscribe( (next) => {
console.log('second subscriber: ' + next);
});
}, 5000 );
Output (before error is thrown)
// "first subscriber: 0"
// "first subscriber: 1"
// "first subscriber: 2"
// "first subscriber: 3"
Output (after error is thrown)
// "first subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
快速查看主题可以识别错误何时出现,主题被标记为inError,每个未来的订户将收到最后一次通知(第46行),并在呼叫onError后立即发出通知(第50行)。
那么,这会给我们留下什么呢?在我看来,当弹性运算符跟随任何其他包含主题的运算符(shareReplay、publish等)时,我不相信你可以使用它。
在这一点上,我认为这种设计成功的唯一方法是确保当发生错误并处理节点时,无论何时使用主题,都需要创建一个新的主题(然后我们开始进入兔子洞)?
多播可能需要一个工厂/主题Selector:
.multicast( () => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source );
查看源,如果您使用subjectSelector,而不是直接为每个新订阅传递主题,那么subjectSelecter将被调用,并创建一个新的ConnectableObservable(第11行)。
在这一点上,我不确定主题的共享(通过一些缓存)和处理(当检测到错误时)是否真的会给订阅者多播?
在谈到这一点时,我还写了一个RecoverableReplaySubject,我在处理时删除了错误状态,这更多的是为了测试,并希望RxJS团队将此工作流放入是有充分理由的。
如能就这一主题提供任何指导和经验,我们将不胜感激。
感谢
shareReplay
主题在错误和完成方面具有不同的语义。例如,即使底层可观察对象已经完成(refCount == 0
),shareReplay
也不会完成,因此对它的进一步调用将产生(回放)过去的值。参见jsbin(shareReplay)与jsbin(共享)。
var source = Rx.Observable
.interval(100)
.take(5)
.shareReplay()
var first = source.subscribe( function(next) {
console.log('first subscriber: ' + next);
});
setTimeout(function() {
// first.dispose();
var second = source.subscribe( function(next) {
console.log('second subscriber: ' + next);
});
}, 1000 );
否则,你会发现关于shareReplay
的行为(以及对你的问题的讨论)与其他运营商的行为的解释:
- https://github.com/ReactiveX/rxjs/issues/1110
- https://github.com/ReactiveX/rxjs/issues/453(这是一个很长的讨论,从
jhusain commented on Nov 4, 2015
开始)
所提出的解决方案正是为多播运营商使用工厂函数。无论如何,尝试一下你的新设计,看看它是否有效,应该不会太难。
- 如何在批准露天共享中工作流的审核步骤之前添加确认对话框
- 用于绘制工作流图的JavaScript框架
- 将异步工作流更改为承诺(蓝鸟)
- if 子句在 JavaScript 中限定工作流
- 了解 JavaScript 对象实例或工作流
- 主题和弹性逻辑工作流
- 未捕获的类型错误:对象#<HTML对象元素>没有方法'重新绘制工作流'
- 如何在gump工作流中同时在独立文件和依赖文件中输出javascript
- 在CRM 2011中从JavaScript执行工作流
- 访问Alfresco工作流I'我从JavaScript开始
- 使用 JavaScript 框架的设计器工作流
- 节点 JS 使用异步请求同步工作流
- iOS PhoneGap 调试工作流
- 通过 JavaScript 结束 nintex 工作流
- RxJS 和主/工作线程工作流
- CRM 2011 - 使用 JavaScript 启用潜在顾客实体的工作流加载
- 使用Alfresco获取工作流的文件名
- 将流类型检查添加到 grunt 服务器开发工作流
- django 中的 JavaScript 开发工作流
- 在网页中显示工作流元素的正确方法