Rx.Observable.groupBy会清理空流吗?

Will Rx.Observable.groupBy clean up empty streams?

本文关键字:Observable groupBy Rx      更新时间:2023-09-26

在Node应用程序中,我试图使用RxJS处理事件流。事件流是对许多文档的更改列表。我使用groupBy按文档id将流划分为新的流。但是我想知道,一旦文档在客户端上关闭并且没有为该文档id添加新事件到流中,一旦文档流为空,groupBy会处理该文档的流吗?如果没有,我该如何手动完成?我想避免由于创建新文档流但从未销毁而导致的内存泄漏。

我的建议是:

不要只有documentChanges可观察对象,要有documentEvents可观察对象。

客户端将在打开文档时发送documentOpened事件,在更改文档时发送documentChanged事件,在关闭文档时发送documentClosed事件。

通过同一个可观察对象发送所有三种类型的事件,你建立并保证了一个顺序。如果客户端按此顺序发送documentOpeneddocumentChangeddocumentClosed事件,那么您的服务器将按此顺序看到它们。请注意,对于2个不同客户端发送的事件顺序没有任何保证。这将让您确保特定客户端发送的事件是有序的。

然后,这就是你如何使用groupByUntil:

documentEvents
    .groupByUntil(
        function (e) { return e.documentId; }, // key
        null, // element
        function (group) { // duration selector
            var documentId = group.key;
            return group.filter(function (e) { return e.eventType === 'documentClosed'; });
      })
    .flatMap(function (eventsForDocument) {
        var documentId = eventsForDocument.key;
        return eventsForDocument.whatever(...);
    })
    .subscribe(...);

另一个简单得多的选项:您可以在空闲时间后使组过期。根据您对事件的处理,这可能绰绰有余。如果文档在5分钟内未被编辑,则此示例使组过期。如果有更多的编辑,则会创建一个新组。

var idleTime = 5 * 60 * 1000;
events
    .groupByUntil(
        function(e) { return e.documentId; },
        null,
        function(g) { return g.debounce(idleTime); })
    .flatMap...

由于您包含了. net标记,所以我将介绍Rx。

你的问题措辞有点不正确。当且仅当流没有事件时为空。因此,它们不能变为空。不发送数据的流通常不会消耗太多的资源。

在。net中,组在源终止之前不会终止。我们使用'GroupByUntil ',它允许你为每个组指定一个durationSelector流。可观察到的。

这意味着您可能会得到多个具有相同键的非并发流,但如果(通常情况下)您的组流在某些时候被平摊,则无关紧要。

在rxjs中,我们还有groupByUntil。

在Rx-Java中,行为类似的groupByUntil方法被纳入groupBy中,详细信息请参见https://github.com/ReactiveX/RxJava/pull/1727和https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a。

http://reactivex.io/documentation/operators/groupby.html表示:

如果你取消订阅某个groupedoobservable,该groupedoobservable将被终止。如果源Observable随后发出的项的键值与以这种方式终止的groupedoobservable匹配,groupBy将创建并发出一个新的groupedoobservable来匹配该键值。

所以,在Rx-Java中,你必须取消订阅一个分组的可观察流来终止它。takeUntiltimer流可以工作。

附录:

作为对你评论的回应,直到下游操作符取消订阅,流才会终止。groupByUntil的持续时间选择器将导致终止。如果一个文档关闭后不会再次打开,那么你可以发送一个"documentclosed"事件到流中,并使用常规的groupBy和takeWhile测试"documentclosed"。

不要再次打开文档的原因是,使用groupBy(在rx-js和rx.net中),如果已经看到的键重新出现,则不会创建新组。

如果这是一个问题,那么您将需要使用groupByUntil并使用已发布的流来监视documentClosed事件—使用已发布的流将确保您不会获得订阅副作用。