RxJS -收集异步操作结果

RxJS - Collect async operation results

本文关键字:异步操作 结果 RxJS      更新时间:2023-09-26

我想对数组的每个元素执行异步操作,并在字典中收集其结果。我目前的方法是:

let asyncOp = () => Rx.Observable.interval(300).take(1);
let dict = {};
Rx.Observable.from(['a', 'b'])
  .mergeMap(el => asyncOp()
              .map(asyncOpRes => dict[el] = asyncOpRes)
              .do(state => console.log('dict state: ', dict))
  )
  .takeLast(2)
  .take(1)
  .map(() => dict)
  .subscribe(res => console.log('dict result: ', res));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.7/dist/global/Rx.umd.js"></script>

基本上这是我想要的,但它似乎是一个尴尬的使用RxJs操作符。所以我需要以下帮助:

  1. 避免字典突变(尝试使用scan(),但不知道如何在这里使用它。有一个mergeScan()方法,但这里相同)
  2. takeLast和take -的使用应该可以简化吗?

我想我错过了一个RxJS操作符,它可以帮助我简化这个

要"对数组的每个元素执行异步操作并在字典中收集其结果",可以使用mergeMapreduce函数大大简化代码:

import * as Rx from "rxjs/Rx";
const asyncOp = () => Rx.Observable.interval(300).take(1);
Rx.Observable.from(["a", "b"])
    // Perform the async operation on the values emitted from the
    // observable and map the emitted value and async result into
    // an object.
    .mergeMap((key) => asyncOp().map((result) => ({ key, result })))
    // Use reduce to build an object containing the emitted values
    // (the keys) and the async results.
    .reduce((acc, value) => { acc[value.key] = value.result; return acc; }, {})
    .subscribe((value) => { console.log(value); });