帖子的 RX JS 聚合

rx js aggregation for posts

本文关键字:JS 聚合 RX      更新时间:2023-09-26

我正在尝试使用 rx for js 来理解函数式编程。

我有一个发出"post"对象的 Rx.Observable:

每个帖子看起来像这样:

{
title: "sometitle",
author: "someauthor"
text: "sometext",
date: "somedate",
tags: ['tag1', 'tag2', ..., 'tagN']
}

我想将该序列转换为发出以下信号的序列:

{ 
tag: 'tagname',
postCount: n
}

这是我到目前为止所拥有的:

function tags(post) {     
   return post
            .tags
            .map(function(tag) { return { 'tag': tag, 'count': 1});     
}
posts
  .flatMap(tags)
  .groupBy(function(tagged) { return tagged.tag }) 
  . // don't know how to continue 

正如我之前所说,我的目标是创建一个为每个标签发出{tag: 'tagname', postCount: n }的序列/可观察量

提前感谢

编辑:

忘了提到我正在寻找"面向节点"的答案。

这就是我目前所拥有的。它有效,但我不确定{ ..., count: 1 }部分。我正在寻找一个更"优雅"的解决方案。

posts
    .flatMap(tags)
    .map((tag) => {return {name: tag, count: 1}})
    .groupBy((tagcount) => {return tagcount.name})
    .flatMap((taggroup) => {return taggroup.reduce((a,x) => {return {tag: x.name, count: (a.count + x.count)}})})

它会是这样的:

// sequesnce of posts sequence with 10ms interval
var posts = Rx.Observable
  .fromArray([
    { tags: ['tag1', 'tag2'] },
    { tags: ['tag1', 'tag3'] },
    { tags: ['tag1'] },
    { tags: ['tag1', 'tag2', 'tag3'] }
  ])
  .zip(Rx.Observable.interval(10), Rx.helpers.identity)
  .do(logger('post:'));
// sequence of post counts by tags, and count changes
var tagsCountChanges = posts.scan(
  function (acc, post) {
    var counts = acc.counts;
    var changes = [];
    post.tags.forEach(function (tag) {
      counts[tag] = (counts[tag] || 0) + 1;
      changes.push({ tag: tag, postsCount: counts[tag] });
    });
    return { counts, changes };
  }, { counts: {}, changes: [] })
  .map(acc => acc.changes)
  .do(logger('tagsCountChanges:'));
var tagCountUpdates = tagsCountChanges
  .concatMap(function (changes) {
    return Rx.Observable
      .fromArray(changes);
  });
tagCountUpdates
  .forEach(logger('tagPostCounts:'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
<pre id="log"></pre>
<script>
  var log = document.getElementById('log');
  function logger(label) {
    return function(item) {
      log.appendChild(document.createTextNode(label + ' ' + JSON.stringify(item, null, 2) + ''n'));
    };
  }
</script>

更新(响应编辑1):

它也将在节点中工作:)您也可以删除记录器和帖子序列的间隔 - 它只是为了在浏览器中运行代码段时显示带有中间可观察量的良好项目日志。

我不确定{ ..., count: 1 }部分。 我正在寻找一个更"优雅"的解决方案。

实际上,您可以完全删除{ ..., count: 1 }部分:

posts
    .flatMap(post => post.tags)
    .groupBy(Rx.helpers.identity)
    .flatMap(taggroup$ => 
       taggroup$.reduce((acc,tag) => {return {tag, count: acc.count+1}}, {count:0})
    )

关于优雅:我喜欢你的解决方案 - 我认为它比我的更有表现力,更简单。但是,我的解决方案在较大的标签计数下性能会更高(因为它不会为每个标签创建内部可观察量)。

此外,我的解决方案与您的解决方案略有不同 - 它将发出标签计数更改流,而不仅仅是最终计数(帖子流完成后)。

可以轻松修改您的解决方案以达到相同的结果 - 只需将reduce替换为scan即可。

签证反之亦然 - 如果只需要总数,我的解决方案可以简化很多:

posts.reduce(
  (counts, post) => {
    post.tags.forEach(tag => {
      counts[tag] = (counts[tag] || 0) + 1;
    });
    return counts;
  }, {})
  .flatMap(counts => 
     Object.keys(counts).map(
        tag => ({tag, count: counts[tag]})
     )
  )