重复/重置可观察量

Repeating/Resetting an observable

本文关键字:观察 重复      更新时间:2023-09-26

我正在使用 rxjs 为智能电视上的遥控器创建一个"频道数字"选择器。这个想法是,当您输入号码时,您会在屏幕上看到它们,并且在您完成输入号码后,用户实际上将被带到该频道。

我使用两个可观察量来实现这一点:

  1. 一个"进度"流,侦听所有数字输入,并在通过扫描运算符输入数字时发出串联的数字字符串。

  2. 一个"已完成"的流,在 n 毫秒未输入任何数字后,将发出已完成的最后一个数字字符串。EG:1-2-3 ->"123"。

这是我用来尝试解决此问题的代码:

通道编号:

module.exports = function (numberKeys, source, scheduler) {
    return function (completedDelay) {
        var toNumericString = function (name) {
                return numberKeys.indexOf(name).toString();
            },
            concat = function (current, numeric) {
                return current.length === 3 ? current : current + numeric;
            },
            live = createPress(source, scheduler)(numberKeys)
                .map(toNumericString)
                .scan(concat, '')
                .distinctUntilChanged(),
            completed = live.flatMapLatest(function (value) {
                return Rx.Observable.timer(completedDelay, scheduler).map(value);
            }),
            progress = live.takeUntil(completed).repeat();
        return {
            progress: progress,
            completed: completed
        };
    };
};

创建新闻:

module.exports = function (source, scheduler) {
    return function (keyName, throttle) {
        return source
            .filter(H.isKeyDownOf(keyName))
            .map(H.toKeyName);
    };
};

创建来源:

module.exports = function (provider) {
    var createStream = function (event) {
        var filter = function (e) {
                return provider.hasCode(e.keyCode);
            },
            map = function (e) {
                return {
                    type: event,
                    name: provider.getName(e.keyCode),
                    code: e.keyCode
                };
            };
        return Rx.Observable.fromEvent(document, event)
            .filter(filter)
            .map(map);
    };
    return Rx.Observable.merge(createStream('keyup'), createStream('keydown'));
};

有趣的是,上面的代码在测试条件下(使用 Rx.TestScheduler 模拟源代码和调度程序)按预期工作。但是在生产中,当调度程序根本没有传递并且 source 是 createPress 的结果(上图)时,进度流只会发出直到完成,然后再也不会发出。就好像重复被完全忽略或冗余一样。我不知道为什么。

我在这里错过了什么吗?

您可以使用 Window。在这种情况下,我会建议使用WindowWithTime。您还可以做更多有趣的事情,例如使用窗口(windowBorders),然后使用Debounce作为边界传递源。

source
  .windowWithTime(1500)
  .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

此外,由于我们的窗口是闭合的可观察量,因此我们可以使用 Reduce 来累积窗口中的值并连接我们的数字。


现在,此变体将在 1,5 秒后关闭。相反,我们希望在最后一次按键后等待 x 秒。天真我们可以做source.window(source.debounce(1000))但现在我们订阅了两次我们的源代码,这是我们想要避免的事情,原因有两个。首先我们不知道订阅有什么副作用,其次我们不知道订阅的订单会收到事件。最后一件事不是问题,因为我们使用的去抖动已经在最后一次按键后增加了延迟,但仍需要考虑。

解决方案是发布我们的源代码。为了将发布保留在序列中,我们将其包装到 observable.create 中。

Rx.Observable.create(observer => {
    var ob = source.publish();
    return new Rx.CompositeDisposable(
      ob.window(ob.debounce(1000))
        .subscribe(observer),
      ob.connect());
}).flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

编辑:或者像这样使用publish

source.publish(ob => ob.window(ob.debounce(1000)))
    .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))