检测异步系统中的结束事件

Detect end event in async system

本文关键字:结束 事件 异步 系统 检测      更新时间:2023-09-26

我正在用RxJS编写一些后端的东西。当一切都完成后,我需要提出一些事件。

1)建筑涉及Subject onCompleted所以事件永远不会自然发生。它可以手动发出,但要做到这一点,我需要依赖一些 END 事件,该事件...循环循环。

2)架构具有跟踪待处理任务的pending$ Observable。不幸的是,由于系统的异步性质,这种状态可以被清空几次,所以它的"空"本身不能用作 END 指示器。

这是我的解决方案,这是一个相当黑客的解决方案,因为它需要明确的间隔常量。

import {Observable, Subject} from "rx";
let pendingS = new Subject();
let pending$ = pendingS
  .startWith(new Set())
  .scan((memo, [action, op]) => {
    if (action == "+") {
      memo.add(op);
      return memo;
    } else if (action == "-") {
      memo.delete(op);
      return memo;
    } else {
      return memo;
    }
  }).share();
let pendingDone$ = pending$ // a guess that
  .debounce(2000) // if nothing happens within 2 sec window
  .filter(pending => !pending.size); // and nothing is pending => we're done
pendingDone$.subscribe(function () {
  console.log("pendingDone!");
});
setTimeout(function () {
  pendingS.onNext(["+", "op1"]);
  pendingS.onNext(["+", "op2"]);
  pendingS.onNext(["+", "op3"]);
  pendingS.onNext(["-", "op2"]);
  pendingS.onNext(["-", "op1"]);
  pendingS.onNext(["-", "op3"]);
}, 100);
setTimeout(function () {
  pendingS.onNext(["+", "op4"]);
  pendingS.onNext(["-", "op4"]);
}, 500);

这个(非常普遍的)问题有更优雅的解决方案吗?

到目前为止,我已经找到了 3 种通往 subj 的方法。

1) "空闲超时"

// next event
let pendingDone$ = pending$
  .debounce(2000)
  .filter(pending => !pending.size);
// next + completed events
let pendingDone$ = pending$
  .filter(pending => !pending.size)
  .timeout(2000, Observable.just("done"));

2)"同步设置,异步未设置"

如果挂起删除挂起

到下一个事件循环迭代,则下一个挂起任务应在此之前,删除"挂起为空"时态条件。

pendingS.onNext(["+", "op"]);
// do something
setImmediate(() => 
  pendingS.onNext(["-", "op"]);
);
pendingDone$
  .filter(state => !state.size)
  .skip(1); // skip initial empty state

3) 调度程序 (?

使用调度程序似乎可以实现类似的结果,但这可能是一种非常规的用法,所以我不打算深入研究这个方向。