使用 RxJS 进行速率限制
Rate Limiting with RxJS
我现在发现了RxJS,我的第一个尝试是试图实现API请求的速率限制。
不知何故,我错过了一些东西,并且只是"未定义"地进入输出。
我做错了什么?
const Rx = require('rx');
const request = require('request');
function f() {
return Rx.Observable.from(arguments);
}
function expand(condensedId) {
console.log('requesting', condensedId)
return f(request(INDEX_URL + '/' + condensedId));
}
const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts';
var source = f([1,2,3,4,5,6,7])
.windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds
.flatMap(condensed => expand(condensed))
.map(entry => entry.title);
var subscription = source.subscribe(
function (x) {
console.log('title: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
Rx.Observable.from
期望一个可迭代对象,我不认为对request()
的响应是可迭代的。你可以传递一个返回承诺或可观察量的函数给flatMap
,它将返回一个流,该流将发出已解析的数据。
因此,让我们使用 request-promise
而不是 request
并在 expand
函数中返回一个 Promise。此外,让我们使用 cheerio
库来提取 html 标题:
const Rx = require('rx');
const request = require('request-promise');
// HTML parsing library
const cheerio = require('cheerio');
function f() {
return Rx.Observable.from(arguments);
}
const INDEX_URL = 'http://jsonplaceholder.typicode.com/posts';
// Return an Observable of resolved responses
function expand(condensedId$) {
return condensedId$.flatMap(id => request(INDEX_URL + '/' + id));
}
var source = f([1,2,3,4,5,6,7])
.windowWithTimeOrCount(5000, 2)//rate limitation, 2 every 5 seconds
.flatMap(condensed => expand(condensed))
.map(body => {
const $ = cheerio.load(body);
return $('title').text();
});
相关文章:
- 您的平台不支持RxJS-Array.observe
- 如何使用(this)访问Angular 2 http rxjs catch函数中的对象属性
- 在rxjs中巧妙的蒸汽可观察合并
- http/rxjs catch回调中的Angular 2重定向导致TypeError:无法读取属性'订阅'
- RxJS等待承诺解决
- 如何在RxJS 5中创建Hot Observable
- RxJS油门行为;立即获取第一个值
- 如何获取RxJS Subject或Observable的当前值
- 在webrtc中实时控制视频发送帧速率
- 困在使用RxJS删除所有计数器应用程序中
- 使用RxJS模拟命令队列和撤消堆栈
- 如何仅在RxJs中可观察到的源发出的特定错误上重试
- RxJS:在循环中处理错误.js自定义驱动程序
- 最后,在 rxjs 序列上,在第一个错误时执行
- 如果用户在输入上按 Enter 键,则取消挖空速率限制扩展器
- 如何在 jquery 中的速率按钮下方显示消息状态
- RXJS 为什么只有最后一个数字同时有 a 和 b
- 使用rxjs创建一个可观察的对象,该对象稍后将连接到web套接字
- 正在寻找在rxjs中使用scan的更干净的方法
- 使用 RxJS 进行速率限制