RxJS异步请求更新

RxJS Asynchronous Request Update

本文关键字:更新 请求 异步 RxJS      更新时间:2024-03-30

所以,多亏了RxJS,我现在正在学习反应性思考。现在,我正在读一本RxJS书(RxJS的反应式编程),我读过关于AsyncSubject以及它如何只缓存最后一个接收到的值。我想知道的是,如果我想更新服务器并进行可观察到的更新,会发生什么?既然onComplete已经被调用,我需要创建一个全新的观察者吗?还有其他模式我应该遵循吗?

我的总体要求是,我希望有一种方法可以干净地将数据传递到服务器和从服务器传递数据,并始终保持我的可观察(模型)新鲜。

谢谢,Lee

正如您所怀疑的,在调用onCompleted之后,您无法更改AsyncSubject的值。处理简单的"根据需要调用服务器,在调用之间缓存值"场景的典型方法是使用flatMapLatest将可观察到的触发器映射到表示服务器调用的AsyncSubject。例如,如果你想每30秒刷新一次数据,你可以这样做:

const subscription = Rx.Observable
   .interval(30000)
   .flatMapLatest(() => serverCall())
   .subscribe(x => doStuffWithResult(x));

在您的情况下,我会考虑使用Websockets,因为您想要your observable (model) fresh at all times

类似的东西

var source = Rx.Observable.create(function (observer) {
   websocket.onmessage = function( msg ) {
      observer.onNext( msg );
   }
   websocket.onerror = function( error ) {
      observer.onError( error );
   }
   websocket.onclose = function ( msg ) {
      observer.onComplete( msg );
   }
});

否则,您可以使用间隔

const subscription = Rx.Observable
   .interval(1000)
   .flatMapLatest(() => Rx.Observable.fromPromise(fetch( ... ).then( response => response.json() ).retry(5))
   .subscribe( response => response );

如果出现错误,您可以始终使用retry运算符不立即放弃。