如何获取RxJS Subject或Observable的当前值

How to get current value of RxJS Subject or Observable?

本文关键字:Observable Subject RxJS 何获取 获取      更新时间:2023-09-26

我有一个Angular 2服务:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';
@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

一切都很好。但我有另一个组件,它不需要订阅,只需要在某个时间点获得isLoggedIn的当前值。我该怎么做?

SubjectObservable没有当前值。当一个值被发出时,它被传递给订阅者,并用它完成Observable

如果您想要一个当前值,请使用专门为此目的设计的BehaviorSubjectBehaviorSubject保留最后发出的值,并立即将其发出给新的订阅者。

它还有一个方法getValue()来获得当前值。

应该从"Observable/Subject"中获取值的唯一方法是订阅

如果您使用getValue(),那么您正在做声明性范式中的命令式操作。它是一个逃生舱口,但99.9%的时候你不应该使用getValue()getValue()会做一些有趣的事情:如果主题被取消订阅,它会抛出一个错误,如果主题因为出错而死亡,它会阻止你获得值,等等。但是,在极少数情况下,它也是一个逃生通道。

有几种方法可以以"Rx-y"的方式从主题或可观测对象中获取最新值:

  1. 使用BehaviorSubject:但实际上订阅了它。当您第一次订阅BehaviorSubject时,它将同步发送以前接收或初始化的值
  2. 使用ReplaySubject(N):这将缓存N值并将其重播给新订阅者
  3. A.withLatestFrom(B):当可观测A发射时,使用此运算符可从可观测的B中获取最新值。将为您提供数组[a, b]中的两个值
  4. A.combineLatest(B):每次AB发射时,使用此运算符从AB获取最新值。将为您提供数组中的两个值
  5. shareReplay():通过ReplaySubject进行可观察的多播,但允许您在出现错误时重试可观察的。(基本上,它为您提供了充满希望的缓存行为)
  6. publishReplay()publishBehavior(initialValue)multicast(subject: BehaviorSubject | ReplaySubject)等:利用BehaviorSubjectReplaySubject的其他运营商。同一件事的不同风格,它们基本上是通过一个主题集中所有通知来多播可观察的源。您需要调用connect()来订阅主题为的源

我也遇到过类似的情况,在Subject的值到达后,后期订阅者订阅了它。

我发现ReplaySubject类似于BehaviorSubject,在这种情况下很有魅力。这里有一个链接可以更好地解释:http://reactivex.io/rxjs/manual/overview.html#replaysubject

const observable = of('response')
function hasValue(value: any) {
  return value !== null && value !== undefined;
}
function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}
const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

您可以从这里查看关于如何实现它的完整文章。https://www.imkrish.com/blog/development/simple-way-get-value-from-observable

我在子组件中遇到了同样的问题,最初它必须具有Subject的当前值,然后订阅Subject以听取更改。我只是在服务中保持当前值,以便组件可以访问,例如:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';
@Injectable()
export class SessionStorage extends Storage {
  isLoggedIn: boolean;
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

需要当前值的组件可以从服务中访问它,即:

sessionStorage.isLoggedIn

不确定这是否是正确的做法:)

一个类似的答案被否决。但我认为我可以证明我在有限的情况下提出的建议是合理的。


虽然一个可观察对象确实没有当前值,但它通常会有立即可用的值。例如,使用redux/flux/akita存储,您可以根据大量可观测值从中央存储请求数据,并且该值通常会立即可用。

如果是这种情况,那么当您subscribe时,该值将立即返回。

假设您有一个服务调用,完成后,您想从您的商店中获取可能不会发出的东西的最新价值:

你可能会尝试这样做(你应该尽可能地把东西放在"管道里"):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {
                        // we have serviceCallResponse and customer 
                     });

这样做的问题是,它会阻塞,直到第二个可观测到的物体发出一个值,而这个值可能永远不会发出。

我最近发现,只有当一个值立即可用时,我才需要评估一个可观察的,更重要的是,我需要能够检测到它是否不可用。我最终做了这个:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {
                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;
                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);
                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();
                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

注意,对于以上所有内容,我使用subscribe来获取值(正如@Ben所讨论的)。不使用.value属性,即使我有BehaviorSubject

虽然这听起来有些过头了,但这只是另一个"可能"的解决方案,可以保持可观察类型并减少样板

您可以始终创建扩展getter来获取Observable的当前值。

为此,您需要在global.d.ts打字员声明文件中扩展Observable<T>接口。然后在observable.extension.ts文件中实现扩展getter,最后将打字员和扩展文件都包含到应用程序中。

您可以参考这个StackOverflow答案来了解如何将扩展包含到您的Angular应用程序中。

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}
// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});
// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });

有两种方法可以实现这一点。

  1. BehaviorSubject有一个方法getValue(),您可以在特定时间点获取该值。

  2. 您可以直接使用BehaviorSubject进行订阅,也可以将订阅的值传递给类成员、字段或属性。

我不推荐这两种方法。

在第一种方法中,这是一种方便的方法,您可以随时获取值,您可以将其称为该时间点的当前快照。这样做的问题是,您可以在代码中引入竞争条件,您可能会在许多不同的地方和不同的时间调用此方法,这很难调试。

第二种方法是大多数开发人员在订阅时想要原始值时使用的方法,你可以跟踪订阅,什么时候取消订阅以避免进一步的内存泄漏,如果你真的很想把它绑定到一个变量,并且没有其他方法来接口它,你可以使用这种方法。

我建议,再看看你的用例,你在哪里使用它?例如,当您调用任何API时,您想确定用户是否已登录,您可以将其与其他可观察性相结合:

const data$ = apiRequestCall$().pipe(
 // Latest snapshot from BehaviorSubject.
 withLatestFrom(isLoggedIn),
 // Allow call only if logged in.
 filter(([request, loggedIn]) => loggedIn)
 // Do something else..
);

这样,在角度的情况下,可以通过管道data$ | async将其直接用于UI。

可以创建订阅,然后在获取第一个发出的项目后销毁。在下面的例子中,pipe()是一个函数,它使用一个Observable作为输入,并返回另一个Observer作为输出,而不修改第一个Observble。

使用Angular 8.1.0"rxjs": "6.5.3""rxjs-observable": "0.0.7" 创建的样本

  ngOnInit() {
    ...
    // If loading with previously saved value
    if (this.controlValue) {
      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });
    }
  }

您可以将上次发出的值与Observable分开存储。然后在需要时阅读。

let lastValue: number;
const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);

最好的方法是使用Behaviur Subject,下面是一个示例:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5

另一种方法,如果你想/可以使用异步等待(必须在异步函数内部),你可以使用现代Rxjs:

 async myFunction () {
     const currentValue = await firstValueFrom(
      of(0).pipe(
        withLatestFrom(this.yourObservable$),
        map((tuple) => tuple[1]),
        take(1)
      )
    );
    // do stuff with current value
 }
 

这将发出一个值"0";立即"因为withLatestFrom,然后将解决承诺。

可观察对象只是函数。它们返回一个值流,但为了查看这些值,您需要订阅它们。最基本的方法是对observable调用subscribe()方法,并传入observatornext()回调作为参数。

或者,您可以使用Subscriber,它是Observer的一种实现,但为您提供了诸如unsubscribe之类的附加功能。

import { Subscription } from 'rxjs';
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  privat _subs = new Subscription();
  isLoggedIn$ = this._isLoggedInSource.asObservable();
  isLoggedIn = false;
  constructor() {
    super('session');
    this._subs.add(this.isLoggedIn$.subscribe((value) => this.isLoggedIn = v))
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

然后在另一个组件中,可以导入SessionStorage并访问isLoggedIn的值。

注意:我将使用new Observable()构造函数将_isLoggedInSource的数据封装在可观察的对象中。