Rxjs Duplex Streams

Rxjs Duplex Streams

本文关键字:Streams Duplex Rxjs      更新时间:2023-09-26

RxJs是否有任何方法可以创建一个Observable/Observer对象,该对象具有接收事件和发出事件的逻辑,类似于Rx.Subject,除了不组合Observable和Observer之外,因此当事件从Observable的主体内部发出时,它们不由Observer主体函数处理。

类似这样的东西:

var someSystem = /* some system that receives and emits events */;
function someFn() {
  return ObservableObserver((observer, observable) => {
    observable.subscribe((e) => someSystem.emit(e));
    someSystem.on("data", (d) => observer.onNext(d));
  });
}
const mySystem = someFn();
mySystem.subscribe(
  (e) => {
    console.log("Received from 'someSystem':", e);
    mySystem.onNext("sending this to 'someSystem'");
  }
);

编辑:

我相信我正在寻找的是类似于Node.js双工流:

https://nodejs.org/api/stream.html#stream_class_stream_duplex

如果我遵循您的要求,您可以使用Subject#create方法:

function someFn() {
  return Subject.create(
    Observer.create((e) => someSystem.emit(e)),
    Observable.fromEvent(someSystem, 'data')
  );
}
const mySystem = someFn()
source.subscribe(mySystem);
mySystem.subscribe((e) => {
  console.log("Received from 'someSystem':", e);
});