RXJS 中的分页数据光标以及对 subject.onDone 和错误的混淆

Paginated data cursor in RXJS and confusion about subject.onCompleted and errors

本文关键字:onDone subject 错误 分页 数据 光标 RXJS      更新时间:2023-09-26

我正在使用RXJS,并提出了分页数据光标的实现。我没有花太多时间在响应式函数式编程上,我想知道我的实现是否符合这个库的使用方式。

我想要一个可以从端点加载页面的类。如果您订阅它,您将收到查询的最后一页。第一个订阅会导致自动查询第一页。对"getPage"的调用应为所有订阅触发onNext。多个订阅不应导致多个请求。

我写了一个满足这一点的基本例子,在我的思考过程中进行了大量评论:https://jsfiddle.net/gfmn708g/1/

我的问题是:

  • 这符合RXJS的精神吗?同时使用 ReplySubject 和 shareReplay 对我来说感觉不对,但我找不到其他方法来获得我想要的行为。我读到使用主题是"不好的",违背了范式的原则。
  • 完成
  • 并处理所有正在进行的请求后,第 63 行是否会取消订阅/完成所有 items$ 订阅(第 82 行和第 89 行(?
  • 处理错误的正确
  • 方法是什么,以便将错误传播到订阅者,但它们不会谋杀流并阻止我推送更多请求?

(这是根据SO的问题指南列出的代码(

const logDiv = $("#log");
function log(message, cls) {
    logDiv.append($("<li>").text(message).addClass(cls));
}
/* interface IRequest {
    url: string;
    page: number:
    refresh?: boolean
}
interface IEndpoint {
    get(request: IRequest): [];
} */
// Class that represents a cursor into paginated data
function PagedData(endpoint, url) {
    this._endpoint = endpoint;
    this._url = url;
    // Our request queue is an observable of structurs of type IRequest
    // We use a reply subject so that the last URL requested is in the stream when the first subscriber subscribes.
    this._requestQueue = new Rx.ReplaySubject(1);
    // This is our data observable, subscribe to it to
    // A) receive the last page that this cursor has produced
    // B) receive all future pages
    this.items$ = this._requestQueue
        // Don't re-query unless the "refresh" boolean is true
        .distinctUntilChanged(req => req, (left, right) => right.refresh ? false : left.page == right.page)
        // Make the request...
        .flatMapLatest(request => Rx.Observable.of(request).zip(this._endpoint.get(request)))
        // Wrap data returned with an envelope with data such as which page was requested
        .map(data => {
            const request = data[0];
            const response = data[1];
            return {
                page: request.page,
                url: request.url,
                items: response
            };              
        })
        // Replay last page worth of data on each subscription
        .shareReplay(1);
    // Queue up the first page to be retrieved on first subscription
    this.getPage(1);
}
PagedData.prototype.getPage = function(page, refresh) {
    refresh = refresh || false;
    // Fire off the workflow
    this._requestQueue.onNext({
        url: this._url,
        refresh: refresh,
        page: page
    });
}
PagedData.prototype.dispose = function() {
    // Question: this should unsubscribe ALL of the subscriptions to this.items$, right?
    this._requestQueue.completed();
}
// -----------------
// EXAMPLE USAGE
var dummyEndpoint = {
    get(request) {
        log(`GET: ${request.url} at page ${request.page}`, "service");
        return Rx.Observable.range(request.page * 10, 10)
            .delay(1000)
            .map(i => ({id: i, title: `Track ${i}`}))
            .toArray();
    }
};
const tracks = new PagedData(dummyEndpoint, "/api/tracks");
// This results in getting the first page
tracks.items$.subscribe(data => {
    log(`On page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first")
});
// Wait one second after getting the first page
window.setTimeout(() => {
    // Subscribe again, we will receive the first page with no re-query
    tracks.items$.subscribe(data => log(`Got page ${data.page} after delay`, "second"));
    // Get the second page
    tracks.getPage(2);
    // Wait another second after getting the second page
    window.setTimeout(() => {
        log("Getting second page (without refresh)");
        // This shouldn't result in anything, since "refresh" is false/undefined
        tracks.getPage(2);
        // Wait one more second...
        window.setTimeout(() => {
            log("Getting second page (with refresh)");
            // This should result in getting the second page, refresh is true
            tracks.getPage(2, true);
      // Should get rid of all subscriptions after the last in-flight request?
            tracks.dispose();
        }, 1000);
    }, 2000);
}, 2000);
与其说

Subjects不好,不如说它们往往是新用户的拐杖,所以他们不必实际使用范式(一个可观察量和观察者的价格,我怎么能使用它?

严肃地说,虽然我认为你在这方面的直觉是正确的,但使用 ReplaySubject + shareReplay 是一种代码气味。可能会有所帮助的是尝试思考您的数据实际来自何处。在大多数情况下,函数本身并不存在,它们实际上是由其他东西触发的。

你需要找到其他东西是什么,并遵循它,直到你找到根来源。在大多数情况下,此源将是用户或网络事件,您可以使用 fromEventfromPromise 包装。一旦你有了那个起点,就只需将该源连接到你想要做的事情。

因此,我将重构将终结点调用的业务逻辑转换为Observable扩展:

Rx.Observable.prototype.paginate = function(endpoint, url) {
  return this
    .startWith({
      page: 1,
      refresh: false
    })
    .map(req => 
      ({page: req.page,url: url,refresh: req.refresh}))
    .distinctUntilChanged(req => req,
      (left, right) => right.refresh ? false :
      left.page == right.page)
    .flatMapLatest(request => endpoint.get(request),
      (request, response) => ({
        page: request.page,
        url: request.url,
        items: response
      }))
    .shareReplay(1)
}

上述内容将等待第一个订阅,然后在该订阅发生时自动发出第一个请求。之后,每个后续订阅者将从分页接收最新值。

从那里开始,这将取决于您的来源,但我想您可能会做这样的事情:

var trigger = Rx.Observable.fromEvent($nextPageButton, 'click')
  .scan((current, _) => current + 1, 1)
  .paginate(endpoint, url);

trigger.subscribe(/*Handle result*/);

在这种情况下,您可能不会取消订阅,直到您的页面需要卸载,而您只需在加载时挂接您的管道,它会处理其余的工作。一直以来,订阅trigger总会为您提供最新数据。

我使用现有代码的重构添加了一个工作示例。

const logDiv = $("#log");
function log(message, cls) {
  logDiv.append($("<li>").text(message).addClass(cls));
}
/* interface IRequest {
	url: string;
	page: number:
	refresh?: boolean
}
interface IEndpoint {
	get(request: IRequest): [];
} */
Rx.Observable.prototype.paginate = function(endpoint, url) {
  return this
    .startWith({
      page: 1,
      refresh: false
    })
    .map(req => 
      ({page: req.page,url: url,refresh: req.refresh}))
    .distinctUntilChanged(req => req,
      (left, right) => right.refresh ? false :
      left.page == right.page)
    .flatMapLatest(request => endpoint.get(request),
      (request, response) => ({
        page: request.page,
        url: request.url,
        items: response
      }))
    .shareReplay(1)
}
// -----------------
// EXAMPLE USAGE
var dummyEndpoint = {
  get(request) {
    log(`GET: ${request.url} at page ${request.page} with${request.refresh ? "" : "out"} refresh`, "service");
    return Rx.Observable.range(request.page * 10, 10)
      .delay(1000)
      .map(i => ({
        id: i,
        title: `Track ${i}`
      }))
      .toArray();
  }
};
var trigger = Rx.Observable.concat(
  Rx.Observable.just({
    page: 2
  }).delay(2000),
  Rx.Observable.just({
    page: 2
  }).delay(2000),
  Rx.Observable.just({
    page: 2,
    refresh: true
  }).delay(1000)
);
const tracks = trigger.paginate(dummyEndpoint, "/api/tracks");
tracks.delaySubscription(2000).subscribe(data => log(`Got page ${data.page} after delay`, "second"));
// This results in getting the first page
tracks.subscribe(data => {
  log(`On page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first")
});
#log li.first {
  color: green;
}
#log li.second {
  color: blue;
}
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
<ol id="log">
</ol>