Rx.Observable.groupBy会清理空流吗?
Will Rx.Observable.groupBy clean up empty streams?
在Node应用程序中,我试图使用RxJS处理事件流。事件流是对许多文档的更改列表。我使用groupBy按文档id将流划分为新的流。但是我想知道,一旦文档在客户端上关闭并且没有为该文档id添加新事件到流中,一旦文档流为空,groupBy会处理该文档的流吗?如果没有,我该如何手动完成?我想避免由于创建新文档流但从未销毁而导致的内存泄漏。
我的建议是:
不要只有documentChanges可观察对象,要有documentEvents可观察对象。
客户端将在打开文档时发送documentOpened事件,在更改文档时发送documentChanged事件,在关闭文档时发送documentClosed事件。
通过同一个可观察对象发送所有三种类型的事件,你建立并保证了一个顺序。如果客户端按此顺序发送documentOpened、documentChanged、documentClosed事件,那么您的服务器将按此顺序看到它们。请注意,对于2个不同客户端发送的事件顺序没有任何保证。这将让您确保特定客户端发送的事件是有序的。
然后,这就是你如何使用groupByUntil
:
documentEvents
.groupByUntil(
function (e) { return e.documentId; }, // key
null, // element
function (group) { // duration selector
var documentId = group.key;
return group.filter(function (e) { return e.eventType === 'documentClosed'; });
})
.flatMap(function (eventsForDocument) {
var documentId = eventsForDocument.key;
return eventsForDocument.whatever(...);
})
.subscribe(...);
另一个简单得多的选项:您可以在空闲时间后使组过期。根据您对事件的处理,这可能绰绰有余。如果文档在5分钟内未被编辑,则此示例使组过期。如果有更多的编辑,则会创建一个新组。
var idleTime = 5 * 60 * 1000;
events
.groupByUntil(
function(e) { return e.documentId; },
null,
function(g) { return g.debounce(idleTime); })
.flatMap...
由于您包含了. net标记,所以我将介绍Rx。
你的问题措辞有点不正确。当且仅当流没有事件时为空。因此,它们不能变为空。不发送数据的流通常不会消耗太多的资源。
在。net中,组在源终止之前不会终止。我们使用'GroupByUntil ',它允许你为每个组指定一个durationSelector流。可观察到的。
这意味着您可能会得到多个具有相同键的非并发流,但如果(通常情况下)您的组流在某些时候被平摊,则无关紧要。
在rxjs中,我们还有groupByUntil。
在Rx-Java中,行为类似的groupByUntil方法被纳入groupBy中,详细信息请参见https://github.com/ReactiveX/RxJava/pull/1727和https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a。
http://reactivex.io/documentation/operators/groupby.html表示:
如果你取消订阅某个groupedoobservable,该groupedoobservable将被终止。如果源Observable随后发出的项的键值与以这种方式终止的groupedoobservable匹配,groupBy将创建并发出一个新的groupedoobservable来匹配该键值。
所以,在Rx-Java中,你必须取消订阅一个分组的可观察流来终止它。takeUntil
与timer
流可以工作。
附录:
作为对你评论的回应,直到下游操作符取消订阅,流才会终止。groupByUntil的持续时间选择器将导致终止。如果一个文档关闭后不会再次打开,那么你可以发送一个"documentclosed"事件到流中,并使用常规的groupBy和takeWhile测试"documentclosed"。
不要再次打开文档的原因是,使用groupBy(在rx-js和rx.net中),如果已经看到的键重新出现,则不会创建新组。
如果这是一个问题,那么您将需要使用groupByUntil并使用已发布的流来监视documentClosed事件—使用已发布的流将确保您不会获得订阅副作用。
- Knockout observable没有观察到其中一个属性
- 如何在RxJS 5中创建Hot Observable
- 如何获取RxJS Subject或Observable的当前值
- 从Angular 2中的observable中提取数据
- Angular 2中Observable中获取数据后的队列/回调函数
- KnockoutJS清除函数内的Observable
- 从HTML标记中为observable赋予初始值
- angular2在不重新加载的情况下使用Observable(Http)
- 为什么当订阅两次rx 2.3中的observable时,第二次订阅没有收到任何值
- linq.js: GroupBy(), then ToJSON()
- 在Javascript中实现GroupBy最有效的方法是什么
- 从Observable列表中创建一个Observable对象
- lodash sortBy,然后groupBy,维持订单
- 重新连接具有共享RxJS observable的WebSocket
- 将JSON对象转换为敲除observable在IE8中不起作用,但在所有其他浏览器中都起作用
- 无法处理Knockout.js Observable Array中未定义的集合
- 如何应用groupBy,但使用不同的值作为密钥
- 使用下划线groupby按颜色对汽车阵列进行分组
- 如何将变量引用传递给Observable订阅
- Rx.Observable.groupBy会清理空流吗?