用rxjs通过阈值来压平数值序列

Flatten sequence of numeric values by threshold with rxjs

本文关键字:rxjs 阈值      更新时间:2023-09-26

使用rxjs,我得到了一个可观察的浮点数序列。现在,我想过滤掉流中较小的变化,只在值比以前发出的值大一定量的情况下发出值。

换句话说:总是发射序列中的第一个值。然后,每个发射的(=未滤波的)值应至少比之前的发射的值大delta。任何与该条件不匹配的值都将被筛选。

我已经提出了一个解决方案,做我想做的如上所述:

var obs = Rx.Observable.create(function(observer) {
  /* ... */
});
var last;
obs.map(function(value) {
  if (last === undefined) {
    last = value;
    return value;
  } else {
      var threshold = 0.5,
          delta = Math.abs(last - value);
      if (delta > threshold) {
        last = value;
        return value;
      }
      else {
        return undefined;
      }
    }
  }).
  filter(function(value) {
    return value !== undefined;
  });

我是rxjs和反应式编程的新手,我认为上面的解决方案过于复杂。更重要的是,它违反了反应式编程的原则,即不要在组合管道外保存状态。但我会这样做,因为我通过last变量进行跟踪,并希望去掉它。

我如何解决这个问题并以被动的方式进行?

您可以使用scan来管理您的状态:

var filtered = obs.scan({}, function (acc, value) {
    if (acc.value !== undefined) {
        var threshold = 0.5,
            change = Math.abs(acc.value - value);
        if (change < threshold) {
            return { value: acc.value, isValid: false };
        }
    }
    return { value: value, isValid: true };
})
.filter(function (acc) { return acc.isValid; })
.map(function (acc) { return acc.value; });

我知道这个问题已经得到了答案,但如果你发现自己经常这样做,你可以自己动手。只是吐口水,因为这是一个有趣的实用程序:

Observable.prototype.maxThreshold = function(threshold, selector) {
  var last = null;
  return this.filter(function(x) {
    var n = selector ? selector(x) : x;
    var delta = Math.abs(n - last);
    if(last === null || delta > threshold) {
      last = n;
      return true;
    }
    return false;
  });
});

可按如下方式使用:

streamOfNumbers.maxThreshold(0.5).
  subscribe(function(x) {
    console.log(x);
  });

streamOfObjects.maxThreshold(0.5, function(x) { return x.value; }).
  subscribe(function(x) {
    console.log(x);
  });