如何从AsyncSubject(消费者模式)中准确地订阅一次元素

How to subscribe exactly once to an element from AsyncSubject (consumer pattern)

本文关键字:元素 一次 AsyncSubject 消费者 模式      更新时间:2023-09-26

rxjs5中,我有一个AsyncSubject并希望多次订阅它,但只有一个订阅者应该接收next()事件。所有其他的(如果他们还没有取消订阅)应该立即得到complete()事件,没有next()

的例子:

let fired = false;
let as = new AsyncSubject();
const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}
let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);
// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered
setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

您可以通过编写一个小类来包装初始的AsyncSubject

来轻松实现这一点
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'
class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();
    constructor(private sourceObservable: Observable<T>) {}
    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

你可以在你的例子中试试:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)
let fired = false;
function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    }
}
function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    }
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500)

这里的关键是这一部分:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

每当有人呼叫订阅时,我们将向newSubscriberSubscribed主题发送信号。

当我们订阅底层的Observable时,我们使用

takeUntil(this.newSubscriberSubscribed)

这意味着当下一个用户调用时:

this.newSubscriberSubscribed.next()

先前返回的可观察对象将完成。

因此,这将导致您所要求的结果,即每当出现新订阅时,以前的订阅完成。

应用程序的输出将是:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

编辑:

如果你想让第一个订阅的订阅保持订阅,所有未来的订阅都立即完成(这样当最早的订阅者仍然订阅时,其他人无法订阅)。你可以这样做:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;
    constructor(private sourceObservable: Observable<T>) {}
    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

我们保留一个标志this.isSusbscribed来跟踪是否有人当前订阅。我们还返回一个自定义订阅,可用于在取消订阅时将此标志设置回false。

每当有人尝试订阅时,如果我们将他们订阅到一个空的Observable,它将立即完成。输出如下所示:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

简单的方法是将AsyncSubject包装在另一个对象中,该对象只处理调用1个订阅者的逻辑。假设您只想调用第一个订阅者,下面的代码应该是一个很好的起点

let as = new AsyncSubject();
const createSingleSubscriberAsyncSubject = as => {
    // define empty array for subscribers
    let subscribers = [];
    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');
        subscribers.push(callback);
        // return a function to unsubscribe
        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };
    // the main subscriber that will listen to the original AS
    const mainSubscriber = (...args) => {
        // you can change this logic to invoke the last subscriber
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };
    as.subscribe(mainSubscriber);
    return {
        subscribe,
        // expose original AS methods as needed
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};
// use
const singleSub = createSingleSubscriberAsyncSubject(as);
// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));
// remove first subscriber
unsub1();
setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);