RxJS中的Rx.Observable.ForkJoin和并行异步(使用X射线)
Rx.Observable.ForkJoin in RxJS and parallel async (using X-Ray)
我正试图弄清楚如何使用lapwinglabs/xx-webscraper基于来自网站的解析数据流并行运行(在本例中为10)异步函数。
let pauser = new Rx.Subject()
let count = 0
let max = 10
// function that parse a single url to retrieve data
// return Observable
let parsing_each_link = url => {
return Rx.Observable.create(
observer => {
xray(url, selector)((err, data) => {
if (err) observer.onError(err)
observer.onNext(data)
observer.onCompleted()
})
})
}
// retrieve all the urls from a main page => node stream
let streamNode = xray(main_url, selector)
.paginate(some_selector)
.write()
.pipe(JSONStream.parse('*'))
// convert node stream to RxJS
let streamRx = RxNode.fromStream(streamNode)
.do(() => {
if (count === max) {
pauser.onNext(true)
count = 0
}
})
.do(() => count++)
.buffer(pauser) // take only 10 url by 10 url
streamRx.subscribe(
ten_urls => {
Rx.Observable.forkJoin(
ten_urls.map(url => parsing_each_link(url))
)
.subscribe(
x => console.log("Next : ", JSON.stringify(x, null, 4))
)
}
)
最后一个console.log上的Next从未被调用?!?
不可能确定,但如果你能确保ten_urls
按预期发射,那么下一步就是确保可观测的parsing_each_link
完成,因为forkJoin
将等待其每个源可观测的最后一个值。我在您的代码中看不到任何对observer.onComplete
的调用。
相关文章:
- 在指令控制器中使用$attrs时出现问题
- 如何使用jQuery自动打开页面上的所有链接
- 如何使用url加载程序在webpack中导入多个图像
- 使用php或javascript从facebook相册URL中删除多余的部分
- 正在添加'X'按钮,在文本字段旁边使用javascript
- 如何使用jquery在填充自动完成的值后使文本框只读
- 使用agility.js进行页面布局和合成
- 使用Clipboard.js复制span文本
- 使用JS如何动态更改显示的html文件中的文本背景颜色
- 如何使用WCF服务和javascript表单post上传.doc文件
- 使用 jQuery 的 .on 函数如何获取事件的原始元素
- 如何使用动画实现纸张推车
- 如何在Angular2中使用jQuery插件
- 使用Express捕获参数
- 使用clickToggle并在单击另一个元素时关闭元素
- 如何使用jQuery选择下拉列表的值
- 可以前端maven插件使用节点,npm已经安装
- 使用javascript将动态表从一个html页面打印到另一个html页
- RxJS中的Rx.Observable.ForkJoin和并行异步(使用X射线)
- 我如何在流星应用程序上使用x射线