mongo/ monid批量插入文档的MapReduce

mongo/mongoid MapReduce on batch inserted documents

本文关键字:文档 MapReduce 插入 monid mongo      更新时间:2023-09-26

我正在创建我的批处理并使用下面指定的命令将其插入集合

batch = []
time = 1.day.ago
(1..2000).each{ |i| a = {:name => 'invbatch2k'+i.to_s, :user_id =>  BSON::ObjectId.from_string('533956cd4d616323cf000000'), :out_id => 'out', :created_at => time, :updated_at => time, :random => '0.5' }; batch.push a; }
Invitation.collection.insert batch

如上所述,每个邀请记录的user_id字段值设置为'533956cd4d616323cf000000'

插入created_at: 1.day.ago后,我得到:

2.1.1 :102 > Invitation.lte(created_at: 1.week.ago).count
 => 48
2.1.1 :103 > Invitation.lte(created_at: Date.today).count
 => 2048

:

2.1.1 :104 > Invitation.lte(created_at: 1.week.ago).where(user_id: '533956cd4d616323cf000000').count
 => 14
2.1.1 :105 > Invitation.where(user_id: '533956cd4d616323cf000000').count
 => 2014

另外,我有一个map reduce,它计算每个唯一用户发送的邀请(总数和发送到唯一out_id)

class Invitation
  [...]
  def self.get_user_invites_count
    map = %q{
      function() {
        var user_id = this.user_id;
        emit(user_id, {user_id : this.user_id, out_id: this.out_id, count: 1, countUnique: 1})
      }
    }
    reduce = %q{
      function(key, values) {
        var result = {
          user_id: key,
          count: 0,
          countUnique : 0
        };
        var values_arr = [];
        values.forEach(function(value) {
          values_arr.push(value.out_id);
          result.count += 1
        });
        var unique = values_arr.filter(function(item, i, ar){ return ar.indexOf(item) === i; });
        result.countUnique = unique.length;
        return result;
      }
    }
    map_reduce(map,reduce).out(inline: true).to_a.map{|d| d['value']} rescue []
  end
end

问题是:

Invitation.lte(created_at: Date.today.end_of_day).get_user_invites_count

返回
[{"user_id"=>BSON::ObjectId('533956cd4d616323cf000000'), "count"=>49.0, "countUnique"=>2.0} ...]

代替"count" => 2014, "countUnique" => 6.0 while:

Invitation.lte(created_at: 1.week.ago).get_user_invites_count返回:

[{"user_id"=>BSON::ObjectId('533956cd4d616323cf000000'), "count"=>14.0, "countUnique"=>6.0} ...]

查询提供的数据,在插入批处理之前是准确的。

我不明白这里发生了什么。我错过什么了吗?

您在文档中遗漏的部分似乎是这里的问题:

MongoDB可以对同一个键多次调用reduce函数。在这种情况下,先前reduce函数对该键的输出将成为下一次调用该键的reduce函数的输入值之一。

还有后面的

返回对象的类型必须与map函数发出的值的类型相同,以确保以下操作为真:

所以你看到的是你的reduce函数返回了一个不同于它从映射器接收到的输入的签名。这一点很重要,因为在一次传递中,reducer可能无法获得给定键的所有值。相反,它会获取其中的一些,"减少"结果,并且减少的输出可能会在通过reduce函数的进一步传递中与键的其他值(可能也减少了)组合在一起。

由于您的字段不匹配,随后的reduce传递不会看到这些值,也不会计入您的总数。因此,您需要对齐值的签名:

  def self.get_user_invites_count
    map = %q{
      function() {
        var user_id = this.user_id;
        emit(user_id, {out_id: this.out_id, count: 1, countUnique: 0})
      }
    }
    reduce = %q{
      function(key, values) {
        var result = {
          out_id: null,
          count: 0,
          countUnique : 0
        };
        var values_arr = [];
        values.forEach(function(value) {
          if (value.out_id != null)
            values_arr.push(value.out_id);
          result.count += value.count;
          result.countUnique += value.countUnique;
        });
        var unique = values_arr.filter(function(item, i, ar){ return ar.indexOf(item) === i; });
        result.countUnique += unique.length;
        return result;
      }
    }
    map_reduce(map,reduce).out(inline: true).to_a.map{|d| d['value']} rescue []
  end

您也不需要在发出或保留的值中使用user_id,因为它已经是mapReduce的"键"值。其余的修改考虑到"count"answers"countUnique"都可以包含需要考虑的现有值,而您只是在每次传递时将值重置为0。

当然,如果"输入"已经通过了"减少"通道,那么你不需要"out_id"值被过滤为"唯一性",因为你已经有了计数,现在包括了。因此,任何null值都不会被添加到要计数的数组中,它也会被"添加"到总数中,而不是替换它。

所以reducer 会被多次调用。对于20个键值,输入可能不会被分割,这就是为什么输入较少的示例可以工作的原因。对于更多的情况,相同键值的"组"将被拆分,这就是mapReduce优化大型数据处理的方式。由于"减少"的输出将再次发送回减速器,您需要注意,您正在考虑在前一遍中已经发送到输出的值。