在RxJS中,对一段时间内的事件进行计数,并每秒产生一次总和

Count events over a period of time and yield the sum once every second in RxJS

本文关键字:一次 一段时间 RxJS 事件      更新时间:2023-09-26

是否有可能在一段时间内计数事件并在RxJS中每秒产生一次总和?我有一个连续不断的事件流。每隔1秒,我希望获得过去5分钟窗口内的事件总数。这个想法是用它来填充一个实时图。

我知道如何用传统的方式做到这一点,但是我真的很想了解如何用响应式编程来做到这一点。

我是这样处理的。

创建一个可观察对象,它只计算接收到的事件数量,并将其作为运行总数(通过scan)发出。创建第二个可观察对象,它只是延迟5分钟的运行总数。创建第三个可观察对象,从第一个可观察对象中减去延迟的可观察对象。这将产生小于5分钟的事件总数。创建最后一个可观察对象,每秒对第三个可观察对象进行一次采样。

const totalLast5Minutes = eventSource.publish(events => {
    const runningTotal = events
        .scan((e, total) => total + 1, 0)
        .startWith(0);
    const totalDelayed5Minutes = runningTotal
        .delay(5000 * 60)
        .startWith(0);
    return Rx.Observable
        .combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
    .interval(1000)
    .withLatestFrom(totalLast5Minutes, (interval, total) => total)
    .subscribe(total => console.log(`total=${total}`));

这里有另一种我认为可能更简单的方法。你可以使用windowWithTime——参见ReactiveX和RxJS文档。你可以创建重叠的窗口,所以你可以有一个5分钟的事件窗口,然后是另一个5分钟的事件窗口,一秒后开始,再过一秒,等等。

如果计算每个窗口的计数,您将得到一系列5分钟事件的计数,间隔一秒。它看起来像这样:

source.windowWithTime(5 * 60000, 1000)   // create 5 minute windows, one second apart
      .flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
      .subscribe(count => console.log(count));

我已经用它来做你所说的——在一个不间断的事件流中获得一个时间窗口内的事件速率。通过使窗口重叠,您正在生成流上活动的移动平均。