如何创建一个RxJS缓冲区,将NodeJS中的元素分组,但不依赖于永远运行的间隔

How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?

本文关键字:元素 永远 运行 依赖于 创建 何创建 一个 NodeJS 缓冲区 RxJS      更新时间:2023-09-26

我正在NodeJS中使用Rx.Observable.fromEvent从应用程序捕获事件。这些将使用请求发送到另一台服务器(https://www.npmjs.com/package/request)。为了避免高网络负载,我需要在发送请求之间的给定超时时缓冲这些事件。

问题

使用bufferWithTime(200)将保持节点进程的运行,我不知道应用程序何时完成关闭流。

有没有任何方法可以使用Rx缓冲区来表示:

  1. 当按下元素1时,设置计时器
  2. 当元素2和3在定时器到期之前到达时,将它们推送到数组[1,2,3](缓冲区)
  3. 当计时器到期时,将[1,2,3]数组发送到管道中
  4. 如果元素4在定时器到期后出现,则设置一个新的定时器并重新开始

如果没有推送任何元素,则不会启动计时器,这将使进程退出。

我最初的方法是:

Rx.Observable
     .fromEvent(eventEmitter, 'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))

使用delay运算符的拟议实现:

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}
var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");
var delayedSource$ = source.delay(1200);
var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})
buffered$.subscribe(emits("buffer"));

jsbin在这里:http://jsbin.com/wilurivehu/edit?html,js,控制台,输出

您可能需要拆分流,并使用第二部分来触发第一部分。

var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));
source
     .buffer(closer)
     .map(addEventsToRequestOption)
     .flatMap(function(x) { Promise.resolve(request(x)); })
     //I assume this log method returns a function?
     .subscribe(log('Response received'));

source.flatMapFirst(Rx.Observable.timer(2000))是这里的重要线路。它创建一个Observable,该Observable生成一个计时器,该计时器将在2000毫秒后触发。当第一个事件出现时,它将启动计时器。只要计时器正在运行,flatMapFirst就会忽略后续事件。当计时器最终发出时,它将触发缓冲区发出其当前缓冲区并重新启动。

请参阅关于buffer的文档,该文档具有边界Observable