有一种方法可以创建此流序列

There's a way of create this sequence of Streams?

本文关键字:创建 方法 一种      更新时间:2023-09-26

我正在尝试实现这个大理石图,具有 N 个 sN$ 的 hipotesis,我正在将此流添加到 main$。

s1$    +--1--------------------99--------------------->
s2$    +------3--------7------------------------------>
main$  +---[1]-[1, 3]---[1, 7]---[99, 7]-------------->

现在我有一个接近,但有"重复"

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()
main$
  .scan((a, c) => [...a, c], [])
  .subscribe(v => console.log(v))
s1$.subscribe(x => main$.onNext(x))
s2$.subscribe(x => main$.onNext(x))    
s1$.onNext(3)
s2$.onNext(1)
s1$.onNext(6)
s2$.onNext(44)
/*
  Expect:
    [3]
    [3, 1]
    [6, 1]
    [6, 44]
*/
/*
  What I have:
     [3]
     [3, 1]
     [3, 1, 6]
     [3, 1, 6, 44]
*/

有没有办法做到这一点?我也尝试将流 sN$ 添加到 main$ 中:

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()
main$
  .mergeAll()
  .scan((a, c) => [...a, c], [])
  .subscribe(
    (v) => console.log(v)
  )
main$.onNext(s1$)
main$.onNext(s2$)
s1$.onNext(3)
s2$.onNext(1)
s1$.onNext(6)
s2$.onNext(44)

您可以使用 combineLatest。虽然这仍然要求每个流都以一个值开头,但您可以使用 startWith 作为前缀null值,使每个流都以某些内容开头。

const source = Rx.Observable.combineLatest(
  s1.startWith(void 0),
  s2.startWith(void 0),
  s3.startWith(void 0),
  (s1, s2, s3) => [s1, s2, s3])

(可选)可以从生成的数组中删除undefined值。

现在,我们可以将其扩展为使用流的变量列表。@xgrommx学分。

main$
 .scan((a, c) => a.concat(c), [])
 .switch(obs => Rx.Observable.combineLatest(obs))

我们还可以使用c.shareReplay(1)使流记住最后一个值,当我们switch .但是,这不会与 c.startWith(void 0) ,因此我们可以使用其中之一。

例:

    const main$ = new Rx.Subject()
    const s1$ = new Rx.Subject(1)
    const s2$ = new Rx.Subject(1)
    const s3$ = new Rx.Subject(1)
    const s4$ = new Rx.Subject(1)
    main$
     .scan((a, c) => a.concat(c.shareReplay(1)), [])
     .map(obs => Rx.Observable.combineLatest(obs))
     .switch()
     .map(v => v.filter(e => !!e))
     .map(v => v.join(','))
     .subscribe(v => $('#result').append('<br>' + v))
    main$.onNext(s1$)
    s1$.onNext(1)
    main$.onNext(s2$)
    s2$.onNext(void 0) // Since we can't use startWith
    main$.onNext(s3$)
    s3$.onNext(5)
    s1$.onNext(55)
    s2$.onNext(12)
    s2$.onNext(14)
    s3$.onNext(6)
    main$.onNext(s4$)
    s4$.onNext(999)
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    <div id="result"></div>

我终于用一些过滤器解决了我在startWith()开始的空值的问题:

main$
  .scan((a, c) => [...a, c.startWith(null).shareReplay(1)], [])
  .map(obs => Observable.combineLatest(obs))
  .switch()
  .map((x) => x.filter((x) => x != null))
  .filter((x) => x.length)

看起来不可读(就像任何 Rx 序列一样,但如果你画弹珠是完全有意义的!