如何确保对观察者的订阅调用最初接收到最新的值

How do I ensure that a call to subscribe on the observer initially receives the most recent value?

本文关键字:调用 最新 何确保 确保 观察者      更新时间:2023-11-06

我将rxjs与Angular 2和Typescript一起使用。我想在多个组件之间共享一个通用的web资源(在我的应用程序上下文中是一个"项目",本质上是一个JSON文档)。为了实现这一点,我引入了一种服务,它公开了一个可观察的,将由所有客户端共享:

/**
 * Handed out to clients so they can subscribe to something.
 */
private _observable : Observable<Project>;
/**
 * Used to emit events to clients.
 */
private _observer : Observer<Project>;
constructor(private _http: Http) {
    // Create observable and observer once and for all. These instances
    // are not allowed to changed as they are passed on to every subscriber.
    this._observable = Observable.create( (obs : Observer<Project>) => {
        this._observer = obs;
    });
}

客户端现在只需获得一个_observable的引用并订阅它

/**
 * Retrieves an observable that always points to the active
 * project.
 */
get ActiveProject() : Observable<Project> {
    return (this._observable);
}

当某个组件决定实际加载项目时,它会调用以下方法:

/**
 * @param id The id of the project to set for all subscribers
 */
setActiveProject(id : string) {
    // Projects shouldn't change while other requests are in progress
    if (this._httpRequest) {
        throw { "err" : "HTTP request in progress" };
    }
    this._httpRequest = this._http.get('/api/project/' + id)
        .catch(this.handleError)
        .map(res => new Project(res.json()));
    this._httpRequest.subscribe(res => {
        // Cache the project
        this._cachedProject = res;
        // Show that there are no more requests
        this._httpRequest = null;
        // Inform subscribers
        this._observer.next(this._cachedProject)
        console.log("Got project");
    });
}

它基本上执行一个HTTP请求,将JSON文档转换为一个"适当"的实例,并调用this._observer.next()来通知所有订阅者有关更改的信息。

但是,如果在之后订阅了,则在发出新的HTTP请求之前,什么都看不到。我发现rxjs中有某种缓存(或重放?)机制似乎可以解决这个问题,但我不知道如何使用它。

tl;dr:如何确保对观察器上subscribe的调用最初接收到最新的值?

额外问题:通过"将观察者从可观察对象中拉出"(在构造函数中),我本质上创建了一个主题吗?

BehaviorSubject就是这样做的

import { BehaviorSubject } from 'rxjs/subject/BehaviorSubject';
...
obs=new BehaviourSubject(4);
obs.subscribe(); //prints 4
obs.next(3); //prints 3
obs.subscribe(); //prints 3

我通常使用shareReplay(1)来实现这一点。使用这个带有1作为参数的运算符将确保最新发出的值将保存在缓冲区中,因此当有新的订阅者时,该值将立即传递给它。您可以查看文档:

var interval = Rx.Observable.interval(1000);
var source = interval
    .take(4)
    .doAction(function (x) {
        console.log('Side effect');
    });
var published = source
    .shareReplay(3);
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
    .return(true)
    .delay(6000)
    .flatMap(published)
    .subscribe(createObserver('SourceC'));
function createObserver(tag) {
    return Rx.Observer.create(
        function (x) {
            console.log('Next: ' + tag + x);
        },
        function (err) {
            console.log('Error: ' + err);
        },
        function () {
            console.log('Completed');
        });
}
// => Side effect
// => Next: SourceA0
// => Next: SourceB0
// => Side effect
// => Next: SourceA1
// => Next: SourceB1
// => Side effect
// => Next: SourceA2
// => Next: SourceB2
// => Side effect
// => Next: SourceA3
// => Next: SourceB3
// => Completed
// => Completed
// => Next: SourceC1
// => Next: SourceC2
// => Next: SourceC3
// => Completed

额外问题:通过"把观察者从可观察的事物中拉出来"(in构造函数),我本质上创建了一个主题吗?

我不知道你的意思是什么,但不是。主体既是观察者又是可观察者,具有特定的语义。正如你所说,仅仅"把观察者从可观察的事物中拉出来"是不够的。关于主题语义,请看这里:不同RxJS主题的语义是什么?