RxJS对已发出的事件nodejs进行分组
RxJS grouping emitted events nodejs
我正在查询数据库,并将结果作为事件"db_row_received"的逐行流检索。我正试图按公司Id对这些结果进行分组,但我在订阅中没有得到任何输出。
数据库行格式如下所示。
// row 1
{
companyId: 50,
value: 200
}
// row 2
{
companyId: 50,
value: 300
}
// row 3
{
companyId: 51,
value: 400
}
代码:
var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
return acc + v.value;
}, 0));
var subscription = selectMany.subscribe(function (obs) {
console.log("value: ", obs);
}
预期输出:
value: 500 // from the group with companyId 50
value: 400 // from the group with companyId 51
实际输出:订阅不输出任何内容,但在使用Rx时有效。Observable.fromArray(someArray)
有人能告诉我哪里出了问题吗?
因此,问题是只有当底层流complete
d时,reduce
才会产生单个值。由于事件发射器是一种无限源,因此它始终处于活动状态。
看看下面的片段——第一个例子完成了,另一个没有。
const data = [
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'A', v: 1},
{k: 'A', v: 1},
];
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT', d.key, d.sum))
.subscribe();
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.merge(Rx.Observable.never()) // MERGIN NEVER IN
// .take(data.length) // UNCOMMENT TO MITIGATE NEVER
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
.subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>
我不知道你的具体用例,但脑海中最常见的两件事是:
- 使用CCD_ 3(可能具有去抖动)
- 如果存在指示下属流结束的事件,则使用
takeUntil
相关文章:
- NodeJS-readline暂停和恢复事件发射器(逐行读取)
- 在Nodejs中堆叠异步回调事件的最佳方式
- 在NodeJS中使用类函数作为事件侦听器
- 通过进程使用 NodeJS 全局事件是个好主意吗?
- Nodejs事件多次触发
- NodeJS中的事件,事件正在setInterval内发出,但不能没有setInterval
- nodeJS 等待无法承诺的事件
- 是否可以在 NodeJS 中模拟键盘/鼠标事件
- 当事件发生时调用nodejs中的函数
- NodeJS&Socket.IO:发出请求事件并获取响应,我应该在何时/何地绑定侦听器
- 如何在 nodejs 中维护事件循环中的持久数据
- NodeJS如何将onchange事件添加到字符串中
- 使用 on nodejs 从另一个模块捕获事件
- NodeJS + SocketIO 大套接字事件管理
- “事件发射器内存泄漏”发生,尽管使用了最新版本的nodejs
- 具有多个“数据”事件处理程序和竞争条件的 NodeJS 流
- Nodejs正在侦听来自http.get()的数据事件的响应流
- 在nodejs express模块中处理所有事件之前发送响应
- settimeout正在阻塞事件循环Nodejs检查下一个示例:
- NodeJS事件循环基础