RxJS中的Rx.Observable.ForkJoin和并行异步(使用X射线)

Rx.Observable.ForkJoin in RxJS and parallel async (using X-Ray)

本文关键字:使用 射线 异步 并行 Rx 中的 Observable ForkJoin RxJS      更新时间:2023-09-26

我正试图弄清楚如何使用lapwinglabs/xx-webscraper基于来自网站的解析数据流并行运行(在本例中为10)异步函数。

let pauser = new Rx.Subject()
let count = 0
let max = 10
// function that parse a single url to retrieve data
// return Observable
let parsing_each_link = url => {
   return Rx.Observable.create(
      observer => {
         xray(url, selector)((err, data) => {
            if (err) observer.onError(err)
            observer.onNext(data)
            observer.onCompleted()
         })
    })
}
 
// retrieve all the urls from a main page => node stream
let streamNode = xray(main_url, selector)
   .paginate(some_selector)
   .write()
   .pipe(JSONStream.parse('*'))
// convert node stream to RxJS
let streamRx = RxNode.fromStream(streamNode)
   .do(() => {
      if (count === max) {
         pauser.onNext(true)
         count = 0
      }
   })
   .do(() => count++)
   .buffer(pauser) // take only 10 url by 10 url
   
streamRx.subscribe(
   ten_urls => {
      Rx.Observable.forkJoin(
         ten_urls.map(url => parsing_each_link(url))
      )
      .subscribe(
         x => console.log("Next : ", JSON.stringify(x, null, 4))
      )
   }
)

最后一个console.log上的Next从未被调用?!?

不可能确定,但如果你能确保ten_urls按预期发射,那么下一步就是确保可观测的parsing_each_link完成,因为forkJoin将等待其每个源可观测的最后一个值。我在您的代码中看不到任何对observer.onComplete的调用。