RxJS 5可观察对象的延迟模式

Deferred pattern with RxJS 5 observables

本文关键字:延迟 模式 对象 观察 RxJS      更新时间:2023-09-26

对于任意的承诺实现,延迟模式(不要与反模式混淆)可能看起来像:

const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred对象保存未解决的承诺,可以通过引用传递给其他函数作用域。所有的承诺链都会在承诺结算时执行,无论deferred.promise是在then链之前还是之后结算。

承诺状态已定,不可更改。

由答案可知,初始选择为ReplaySubjectAsyncSubject

对于给定的设置(演示)

var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);
setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

这会产生理想的行为:

subject.error('one');
subject.next('two');

早期错误1

延迟错误1

这会导致不良行为:

subject.error('one');
subject.next('two');
subject.complete();

早期错误1

后期结果二

这会导致不良行为:

subject.next('two');
subject.complete();
subject.next('three');

早期结果二

期末成绩三

ReplaySubject的结果不同,但仍与预期结果不一致。next值和error错误被分开处理,complete不阻止观察者接收新数据。这可能适用于单个next/error,问题是nexterror可能被无意中调用多次。

使用first()的原因是因为subscribe s是一次性订阅,我想删除它们以避免泄漏。

如何用RxJS可观察对象实现?

您可能正在寻找Rx.ReplaySubject(1)(或Rx.AsyncSubject()取决于您的用例)。

有关主题的更详细解释,请参见不同RxJS主题的语义是什么?

基本上,主题可以通过引用传递,就像deferred一样。您可以向它发出值(resolve将是'next' (Rxjs v5)或'onNext' (Rxjs v4),然后是'complete''onCompleted()'),只要您持有该引用。

一个主题可以有任意数量的订阅者,类似于延迟的then。如果您使用replaySubject(1),则任何订阅者都将收到最后发出的值,该值应该响应您的it doesn't matter if deferred.promise was settled before chaining with then or after.。在Rxjs v4中,replaySubject将在完成订阅后向订阅者发送它的最后一个值。我不确定Rxjs v5中的行为。

  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md

更新

使用Rxjs v4执行以下代码:

var subject = new Rx.AsyncSubject();
var deferred = subject;
deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);
setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});
subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

产生以下输出:

First result one
Second result one

但是,在Rxjs v5中执行的相同代码不会:

First result one
Second result four

所以基本上这意味着主题的语义在Rxjs v5中发生了变化!这确实是一个需要注意的突破性变化。无论如何,您可以考虑回到Rxjs v4,或者使用artur grzesiak在他的回答中建议的转变。你也可以在github网站上提交一个问题。我相信这一改变是有意为之,但事实并非如此,提出这一问题可能有助于澄清情况。在任何情况下,无论选择什么行为,都应该正确地记录。

关于主题语义的问题以一个链接为特征,该链接显示了与多个和延迟订阅相关的异步主题

正如@user3743222所写的AsyncSubject可能在deferred实现中使用,但问题是它必须是private并防止多个resolve s/reject s。

下面是一个可能的实现镜像resolve-reject-promise结构:

const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}
// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));
// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);
// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>