合并可观察量 A 和 B:仅当 A 有数据时才应触发;如果数据或默认值,则从 B 获取

Combine Observables A and B: Should only trigger if A has data; take from B if data or default

本文关键字:数据 获取 如果 则从 默认值 观察 合并 仅当      更新时间:2023-09-26

我有一个应用程序通过串行端口与设备通信。每个发送的命令都由包含状态/答案的数据事件应答。基本上有更改设备的命令和仅返回状态的命令。每次应答最后一个命令时(因此在接收数据时),应用应发送下一个命令或默认查询状态。我正在尝试使用 rxjs 对此进行建模。我的想法是,有一个命令可观察和一个从数据事件派生的数据可观察。这两者应该以这样的方式组合,即生成的可观察量仅在有数据时发出值,并将其与命令或默认命令(请求状态)组合在一起,如果没有命令从命令流中下来。

data:     ---------d---d-----d---------d------d-------
command:  --c1---c2----------------------c3-----------
______________________________________________________
combined  ---------c1--c2----dc--------dc-----c3

dc 是默认命令。此外,不应丢失任何命令。

目前,我

有一个匿名主题的实现,我自己实现了可观察和观察者。从数组中的命令流收集命令,订阅数据事件,使用 onNext 手动发布数据,并从数组或默认命令发送下一个命令。这有效,但我觉得这可以用 rxjs 更优雅地表达。

一种方法是使用单独的default_command流,每 100 毫秒重复一次默认命令。它与命令流合并,然后与数据流一起压缩。这里的问题是合并的命令流,因为它堆积了默认命令,但默认命令应该只在没有其他命令的情况下应用。

我唯一能想到的就是:

  • 订阅命令流并将结果排队(在数组中)
  • 将映射操作应用于将从队列中提取的数据流(如果队列为空,则使用 default)。

我们可以将其包装成一个通用的可观察运算符。 我不擅长名字,所以我把它称为zipWithDefault

Rx.Observable.prototype.zipWithDefault = function(bs, defaultB, selector) {
  var source = this;
  return Rx.Observable.create(function(observer) {
    var sourceSubscription = new Rx.SingleAssignmentDisposable(),
      bSubscription = new Rx.SingleAssignmentDisposable(),
      subscriptions = new Rx.CompositeDisposable(sourceSubscription, bSubscription),
      bQueue = [],
      mappedSource = source.map(function(value) {
        return selector(value, bQueue.length ? bQueue.shift() : defaultB);
      });
    bSubscription.setDisposable(bs.subscribe(
      function(b) {
        bQueue.push(b);
      },
      observer.onError.bind(observer)));
    sourceSubscription.setDisposable(mappedSource.subscribe(observer));
    return subscriptions;
  });
};

并像这样使用它:

combined = dataStream
  .zipWithDefault(commandStream, defaultCommand, function (data, command) {
    return command;
  });

我认为sample运算符将是你最好的选择。 不幸的是,它没有内置的默认值,因此您必须从现有运算符中滚动自己的默认值:

Rx.Observable.prototype.sampleWithDefault = function(sampler, defaultValue){
var source = this;
return new Rx.AnonymousObservable(function (observer) {
      var atEnd, value, hasValue;
      function sampleSubscribe() {
          observer.onNext(hasValue ? value : defaultValue);
          hasValue = false;
      }
      function sampleComplete() {
          atEnd && observer.onCompleted();
      }
      return new Rx.CompositeDisposable(
        source.subscribe(function (newValue) {
          hasValue = true;
          value = newValue;
        }, observer.onError.bind(observer), function () {
          atEnd = true;
        }),
        sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleComplete)
      );
    }, source);
}

您可以使用 controlled 运算符实现排队行为。 因此,您的最终数据链如下所示:

var commands = getCommandSource().controlled();
var pipeline = commands
  .sampleWithDefault(data, defaultCommand)
  .tap(function() { commands.request(1); });

下面是一个完整的示例:

Rx.Observable.prototype.sampleWithDefault = function(sampler, defaultValue) {
  var source = this;
  return new Rx.AnonymousObservable(function(observer) {
    var atEnd, value, hasValue;
    function sampleSubscribe() {
      observer.onNext(hasValue ? value : defaultValue);
      hasValue = false;
    }
    function sampleComplete() {
      atEnd && observer.onCompleted();
    }
    return new Rx.CompositeDisposable(
      source.subscribe(function(newValue) {
        hasValue = true;
        value = newValue;
      }, observer.onError.bind(observer), function() {
        atEnd = true;
      }),
      sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleComplete)
    );
  }, source);
}
var scheduler = new Rx.TestScheduler();
var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var data = scheduler.createHotObservable(onNext(210, 18), 
                                         onNext(220, 17),
                                         onNext(230, 16),
                                         onNext(250, 15),
                                         onCompleted(1000));
var commands = scheduler.createHotObservable(onNext(205, 'a'), 
                                             onNext(210, 'b'), 
                                             onNext(240, 'c'), 
                                             onNext(400, 'd'), 
                                             onCompleted(800))
  .controlled(true, scheduler);
var pipeline = commands
  .sampleWithDefault(data, 'default')
  .tap(function() {
    commands.request(1);
  });
var output = document.getElementById("output");
pipeline.subscribe(function(x) {
  
  var li = document.createElement("li");
  var text = document.createTextNode(x);
  
  li.appendChild(text);
  output.appendChild(li);
    
  
});
commands.request(1);
scheduler.start();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.testing.js"></script>
<div>
  
  <ul id="output" />
  
</div>

这可以通过使用扫描功能来解决。在累积值中,存储尚未收到任何数据响应的命令。

var result = Rx.Observable
   .merge(data, command)
   .scan(function (acc, x) {
        if (x === 'd') {
            acc.result = acc.commands.length > 0 ? acc.commands.shift() : 'dc';
        } else {
            acc.result = '';
            acc.commands.push(x);
        }
        return acc;
   }, {result: '', commands: []})
   .map(function (x) {
      return x.result;
   })
   .filter(function (x) {
       return x !== '';
   });

请在此处找到完整的更多详细信息:http://jsbin.com/tubade/edit?html,js,console