RxJS对已发出的事件nodejs进行分组

RxJS grouping emitted events nodejs

本文关键字:nodejs 事件 RxJS      更新时间:2023-09-26

我正在查询数据库,并将结果作为事件"db_row_received"的逐行流检索。我正试图按公司Id对这些结果进行分组,但我在订阅中没有得到任何输出。

数据库行格式如下所示。

 // row 1
    {
        companyId: 50,
        value: 200
    }
    // row 2   
    {
        companyId: 50,
        value: 300
    }
    // row 3 
    {
        companyId: 51,
        value: 400
    }

代码:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
                             return acc + v.value;
                          }, 0));
var subscription = selectMany.subscribe(function (obs) {
                        console.log("value: ", obs);
                   }

预期输出:

value: 500    // from the group with companyId 50
value: 400    // from the group with companyId 51

实际输出:订阅不输出任何内容,但在使用Rx时有效。Observable.fromArray(someArray)

有人能告诉我哪里出了问题吗?

因此,问题是只有当底层流completed时,reduce才会产生单个值。由于事件发射器是一种无限源,因此它始终处于活动状态。

看看下面的片段——第一个例子完成了,另一个没有。

const data = [
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
];
Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT', d.key, d.sum))
  .subscribe();
  
Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .merge(Rx.Observable.never()) // MERGIN NEVER IN
  // .take(data.length) // UNCOMMENT TO MITIGATE NEVER
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
  .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

我不知道你的具体用例,但脑海中最常见的两件事是:

  • 使用CCD_ 3(可能具有去抖动)
  • 如果存在指示下属流结束的事件,则使用takeUntil