RxJS,如何轮询API以使用动态时间戳连续检查更新的记录

RxJS, how to poll an API to continuously check for updated records using a dynamic timestamp

本文关键字:时间戳 动态 连续 检查更新 记录 何轮询 API RxJS      更新时间:2023-09-26

我是RxJS的新手,我正在尝试编写一个能完成以下任务的应用程序:

  1. 加载时,发出AJAX请求(为了简单起见,伪造为fetchItems())以获取项目列表
  2. 在那之后的每一秒,发出一个AJAX请求来获取项目
  3. 检查新项目时,只应返回在最近时间戳之后更改的项目
  4. 不应该存在任何可观察性之外的状态

我的第一次尝试很直接,进了1、2和4球。

var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .map(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
    // I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
    return fetchItems(); 
  });

现在我很兴奋,这很容易,实现目标3再难不过了…几个小时后,我就在这里:

var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
    return fetchItems(modifiedSince);
  })
  .do(function(item) {
    if(item.updatedAt > modifiedSince) {
      modifiedSince = item.updatedAt;
    }
  })
  .scan(function(previous, current) {
    previous.push(current);
    return previous;
  }, []);

这解决了目标3,但在目标4上倒退。我现在把状态存储在可观察之外。

我假设全局modifiedSince.do()块不是实现这一点的最佳方式。如有任何指导,我们将不胜感激。

编辑:希望能澄清我想问的问题。

这里是另一个不使用闭包或"外部状态"的解决方案。

我做了以下假设:

  • fetchItems返回项的Rx.Observable,即不是项的数组

它使用了expand运算符,该运算符允许发出遵循x_n+1 = f(x_n)类型递归关系的值。通过返回一个发出该值的可观察对象(例如Rx.Observable.return(x_n+1))来传递x_n+1,然后通过返回Rx.Observable.empty()来完成递归。在这里,你似乎没有一个结束条件,所以这将永远持续下去。

scan还允许发出遵循递归关系的值(x_n+1 = f(x_n, y_n))。不同之处在于,scan强制您使用同步函数(因此x_n+1y_n同步),而对于expand,您可以使用可观测形式的异步函数。

代码没有经过测试,所以如果这有效与否,请随时更新。

相关文档:扩展、组合最新

var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
  return function (acc, current) {
    acc = current[property] > acc ? current[property] : acc;
  }
}    
var data$ = Rx.Observable.return(initial_state)
  .expand (function(state){
             return fetchItem(state.modifiedSince)
                   .toArray()
                   .combineLatest(Rx.Observable.interval(polling_frequency).take(1), 
                     function (itemArray, _) {
                       return {
                         modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue), 
                         itemArray : itemArray
                       }
                     }
  })

您的意思似乎是modifiedSince是您携带的状态的一部分,所以它应该出现在scan中。为什么不把do中的动作也移到扫描中呢?。那么你的种子就是{modifiedSince: null, itemArray: []}

呃,我只是觉得这可能不起作用,因为您需要将modifiedSince反馈给上游的fetchItem函数。你这儿没有自行车吗?这意味着你必须用一个主题来打破这种循环。或者,您可以尝试将modifiedSince封装在一个闭包中。类似的东西

function pollItems (fetchItems, polling_frequency) {
var modifiedSince = null;
var data$ = Rx.Observable.interval(polling_frequency)
  .startWith('run right away')
  .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
    return fetchItems(modifiedSince);
  })
  .do(function(item) {
    if(item.updatedAt > modifiedSince) {
      modifiedSince = item.updatedAt;
    }
  })
  .scan(function(previous, current) {
    previous.push(current);
    return previous;
  }, []);
return data$;
}

我必须跑出去庆祝新年,如果这不起作用,我可以稍后再试一次(可能使用expand运算符,scan的另一个版本)。

这个怎么样:

var interval = 1000;
function fetchItems() {
    return items;
}
var data$ = Rx.Observable.interval(interval)
  .map(function() { return fetchItems(); })
  .filter(function(x) {return x.lastModified > Date.now() - interval}
  .skip(1)
  .startWith(fetchItems());

这应该只为新项目过滤源,并从完整的集合开始。只需编写适合您的数据源的筛选函数即可。

或者通过将参数传递给fetchItems:

var interval = 1000;
function fetchItems(modifiedSince) {
    var retVal = modifiedSince ? items.filter( function(x) {return x.lastModified > modifiedSince}) : items
    return retVal;
}
var data$ = Rx.Observable.interval(interval)
  .map(function() { return fetchItems(Date.now() - interval); })
  .skip(1)
  .startWith(fetchItems());