如何在 RxJS 中直接链接发布子系统中的生产者和消费者,而无需主题

How can I directly link producers and consumers in a pub sub system, without subjects, in RxJS?

本文关键字:消费者 生产者 子系统 RxJS 接发 链接      更新时间:2024-05-12

>Edit

我认为这个问题的实施相当复杂,现在已经 3 个月无法弄清楚了。我在这里的另一个问题中重新措辞了它:RxJS5 - 如何在不使用主题并排除已完成的主题的情况下缓存聚合流的最后一个值?,希望能更简洁地总结我所追求的内容。谢谢大家的帮助!

源语言

现在,我已经学会了如何处理 RxJS 中的状态,方法是通过扫描将一系列部分应用的状态操作函数映射到我的状态上。但是,我现在想更进一步,通过数据平面(pubsub 层(以声明方式连接"生产者"和"消费者"之间的流,而无需通过主题管道连接流。

网上有很多教程涵盖了这一点,但它们基本上都只是在每个流名称或频道的中间层中强制使用 publish 作为 Subject.next。这是我当前的实现,但它需要为每个永无止境的流创建一个 ReplaySubject(1((用于缓存迟到的使用者的值(,因为任何获得流的使用者都会收到一个引用,如果当前没有生产者流式传输到该名称的通道时删除主题,则该引用将无效。

我想直接连接流,并让使用者接收流,这些流是特定名称的所有活动已发布流的聚合。这仍然需要主体在初始注册的生产者流(格式为 {name, stream} 的创建者流的传入流(中进行管道处理。

然后我想将所有同名流合并到一个每个注册消费者作为参考接收的流中(想想一个过滤器服务接收对filter.add的引用,这是所有创建过滤器的活动生产者的合并( - 但让这个流重新合并,以反应方式,并且消费者到该流的链接仍然有效, 如果具有相同名称的新生产者也注册。任何迟到的使用者还应收到该聚合流的最后一个缓存值。

通过这种方式,每次在 pubsub 层上公开新流时,都需要动态重新评估每个聚合流,因此将流包装在"getActive"函数中(如此处(不起作用,因为这是必不可少的,并且仅在首次获取流时发生一次,而不是在每次发布新流时懒惰地为所有消费者重新评估。

结果应为以:

  • 从不完成;
  • 聚合特定名称的所有活动流的结果,并丢弃不再活跃的流;
  • 延迟更新收到对该流的引用的所有使用者,
  • 以便在发布同名的新流时,使用者的引用仍然有效,并重新评估以合并到该新流中;
  • 缓存上次合并的结果,以便新使用者将在订阅时收到上次发出的值。

基本上,我需要"trimToActiveOnly"功能。

function getStream(all$, name) {
  return all$
  .filter(x => x.name === name)
    .map(x => x.stream)
    .map(trimToActiveOnly) // in this way, should be dynamically re-evaluated for all
                 // consumers every time new stream is published or stream ends,
                 // not just run once when the stream is first 'got'
    .flatMapLatest(x => Rx.Observable.merge(x)) // where x is all currently active streams for a particular name, with finished/errored ones discarded
    .publish().refCount(); //this is re-evaluated when a new stream is published or when one of the aggregated streams concludes. So the aggregate stream itself never concludes but may go cold if nothing is subscribed.
}
// desired behavior as followed
const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');
const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
const fooSub = foo$.subscribe(x => console.log('foo: ' + x)); // should receive cached 'world'
const barSub = bar$.subscribe(x => console.log('bar: ' + x)); // should receive cached 'testing'
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
barSourceB$.onNext('123'); // barSub should now receive '123' as the aggregated active streams are dynamically re-evaluated on each publish of a new stream!

我在这里也有一个JSBin。

它并不完全相同,但这与 Rx 有相似之处:一个类似 zip 的运算符,在其中一个流结束后继续?

你有Observable<{name, <Observable>}>.您可以对此应用filter,以将其减少为仅满足特定条件的子流。然后,您可以使用map来排除名称get到Observable<Observable>,然后从那里您可以使用任何您喜欢的简化来组合值(例如 zip按索引匹配,combineLatest在子流产生值时重新发出某种聚合,或者flatMapLatest扁平化为单个流(。我想你会想在链接中使用类似active转换的东西,因为你需要踢出已完成的流,因为它们会干扰许多组合方法。

例如,请考虑以下方法:

function getStream(all$, name) {
  return active(all$.filter(x => x.name === name).map(x => x.stream))
    .flatMapLatest(x => Rx.Observable.merge(x));
}

从这里,您可以像这样设置:

const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');

然后,您可以订阅:

const fooSub = foo$.subscribe(x => console.log('foo: ' + x));
const barSub = bar$.subscribe(x => console.log('bar: ' + x));

你可以推送数据(你不会真正使用主题,这只是一个例子(:

const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
barSourceB$.onNext('123');

如果我正确理解了你想要什么,这将完成这项工作。为了解释这里发生的事情,getStream获取包含名称和流的对象流,仅筛选出具有指定名称的对象,丢弃名称并仅保留流,并通过链接答案中的active函数运行它,这将起到扫描缩减的作用。结果将是一个流数组流,随后我们使用 flatMapLatestmerge 将其平展为单个流。这意味着给定名称的所有流将聚合到单个流中,可以根据需要订阅该流。

这是整个工作:https://jsbin.com/liwiga/2/edit?js,console,output

需要注意的是,如果馈送冷可观察量,则flatMapLatestmerge的输出active将无法正常工作(因为每次发出活动可观察量数组时,订阅都会重新开始(。