使用 RxJS 进行速率限制

Rate Limiting with RxJS

本文关键字:速率 RxJS 使用      更新时间:2023-09-26

我现在发现了RxJS,我的第一个尝试是试图实现API请求的速率限制。

不知何故,我错过了一些东西,并且只是"未定义"地进入输出。

我做错了什么?

const Rx = require('rx');
const request = require('request');
function f() {
  return Rx.Observable.from(arguments);
}
function expand(condensedId) {
  console.log('requesting', condensedId)
  return f(request(INDEX_URL + '/' + condensedId));
}
const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts';
var source = f([1,2,3,4,5,6,7])
  .windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds
  .flatMap(condensed => expand(condensed))
  .map(entry => entry.title);
var subscription = source.subscribe(
  function (x) {
    console.log('title: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

Rx.Observable.from期望一个可迭代对象,我不认为对request()的响应是可迭代的。你可以传递一个返回承诺或可观察量的函数给flatMap,它将返回一个流,该流将发出已解析的数据。

因此,让我们使用 request-promise 而不是 request 并在 expand 函数中返回一个 Promise。此外,让我们使用 cheerio 库来提取 html 标题:

const Rx = require('rx');
const request = require('request-promise');
// HTML parsing library
const cheerio = require('cheerio');
function f() {
  return Rx.Observable.from(arguments);
}
const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts';
// Return an Observable of resolved responses
function expand(condensedId$) {
  return condensedId$.flatMap(id => request(INDEX_URL + '/' + id));
}
var source = f([1,2,3,4,5,6,7])
  .windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds
  .flatMap(condensed => expand(condensed))
  .map(body => {
    const $ = cheerio.load(body);
    return $('title').text();
   });