RxJS 在处理下一个缓冲块之前等待 ajax respose

RxJS await ajax respose before processing next buffered chunk

本文关键字:等待 ajax respose 处理 下一个 缓冲 RxJS      更新时间:2023-09-26

我正在使用带有基本主题对象的RxJS,该对象在未知时间从不同位置接收输入。这些输入需要堆叠并异步发送到服务器 - 但我需要等到 ajax 请求完成,然后再尝试处理下一组缓冲的输入。

flatMapLatest 不适合我的情况,因为每个响应都必须唯一处理。

我可以在处理 ajax 请求时暂停缓冲区,但这意味着我会丢失输入,这是不可接受的。

我看到缓冲区打开对象只是一个可观察对象,我可以以某种方式组合一个计时器可观察对象和一个自定义可观察对象,当我的 ajax 请求完成时触发,当 ajax 请求发送时,我可以暂停计时器?

抱歉,我

还没有要显示的任何代码,我真的还处于设计阶段。

我想做的事情是可行的还是我做错了?

编辑

这是我到目前为止的代码,它似乎做了我想要的 - 但仍然对改进持开放态度。

var bufferedIntervalPauser = new Rx.Subject<boolean>();
var pausableInterval = Rx.Observable.interval(500).pausable(bufferedIntervalPauser);
var purStream = this.updateReqestSubject.asObservable()
    .buffer(pausableInterval)
    .where(b => (b || []).length > 0)
var purSub = purStream.subscribe(
    next => {
        bufferedIntervalPauser.onNext(false); // pause the buffer window
        SomeAjaxMethod(next, {
            success: res => {
                this.HandleResult(res);
            },
            always: () => {
                // when the ajax completes, resume the buffer
                bufferedIntervalPauser.onNext(true);
            }
        });
    },
    err => {
        console.error(err);
    });
// start
bufferedIntervalPauser.onNext(true);

您应该只能使用 buffer 运算符。正如@Enigmativity指出的那样,Rx 有一个序列化协议,因此您的处理程序永远不会同时运行,即当它仍在从上一个调用运行时不会调用它。但是,如果您的处理程序本身调用非阻塞/asnyc 方法,则在创建某些异步响应之前,不会阻止 Rx 调用您的处理程序。在这种情况下,您可能希望以可观察的顺序包装异步调用。然后你可以使用像 concat 这样的运算符来保证多个可观察序列的序列化

编辑:您在此处的问题是您在订阅中执行异步工作。你需要做的是将你的SomeAjaxMethod视为一个可观察的序列,然后只使用合成。

//Psuedo code as I dont have a Java IDE at hand
this.updateReqestSubject.asObservable()
   .Buffer(500ms)
   .Select(buffer=>SomeAjaxMethod(buffer).asObservable().Catch(/*Swallow to match example*/))
   .Concat() //Concatenate the IObservable<IObservable<>> into IObservable<>
   .Subscribe(
     res=>this.HandleResult(res),
     err => {
       console.error(err);
     });