- MongoDB官方文档中文版
- MongoDB中文手册说明
- MongoDB简介
- 安装 MongoDB
- The mongo Shell
- MongoDB CRUD 操作
- 聚合
- 数据模型
- 事务
- 索引
- 安全
- 安全检查列表
- 启用访问控制
- 身份验证
- 基于角色的访问控制
- TLS / SSL(传输加密)
- 静态加密
- 客户端字段级加密
- 审计
- 网络和配置强化
- 实现字段级别修订
- 安全参考
- 附录
- 变更流
- 复制
- 分片
- 分片键
- 哈希分片
- 范围分片
- 区
- 管理分片区
- 按位置细分数据
- 用于更改SLA或SLO的分层硬件
- 按应用或客户细分数据
- 仅插入工作负载的分布式本地写入
- 管理分片区
- 使用块进行数据分区
- 在分片集群中拆分数据块
- 管理
- 存储
- 存储引擎
- 日志记录
- 管理日志记录
- GridFS
- FAQ:MongoDB 存储
- 存储引擎
- 参考
- 运算符
- 查询与映射运算符
- 更新运算符
- 聚合管道阶段
- 聚合管道操作符
- $abs (aggregation)
- $acos (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $allElementsTrue (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $ceil (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateToParts (aggregation)
- $dateFromString (aggregation)
- $literal (aggregation)
- 查询修饰符
- 数据库命令
- 聚合命令
- 地理空间命令
- 查询和写操作命令
- 查询计划缓存命令
- 认证命令
- 用户管理命令
- 角色管理命令
- 复制命令
- 分片命令
- 会话命令
- 管理命令
- 诊断命令
- 免费监控命令
- 系统事件审计命令
- mongo Shell 方法
- 集合方法
- db.collection.aggregate()
- db.collection.bulkWrite()
- db.collection.copyTo()
- db.collection.count()
- db.collection.countDocuments()
- db.collection.estimatedDocumentCount()
- db.collection.createIndex()
- db.collection.createIndexes()
- db.collection.dataSize()
- db.collection.deleteOne()
- db.collection.deleteMany()
- db.collection.distinct()
- db.collection.drop()
- db.collection.dropIndex()
- db.collection.dropIndexes()
- db.collection.ensureIndex()
- db.collection.explain()
- db.collection.find()
- db.collection.findAndModify()
- db.collection.findOne()
- db.collection.findOneAndDelete()
- db.collection.findOneAndReplace()
- db.collection.findOneAndUpdate()
- db.collection.getIndexes()
- db.collection.getShardDistribution()
- db.collection.getShardVersion()
- db.collection.insert()
- db.collection.insertOne()
- db.collection.insertMany()
- db.collection.isCapped()
- db.collection.latencyStats()
- db.collection.mapReduce()
- db.collection.reIndex()
- db.collection.remove()
- db.collection.renameCollection()
- db.collection.replaceOne()
- db.collection.save()
- db.collection.stats()
- db.collection.storageSize()
- db.collection.totalIndexSize()
- db.collection.totalSize()
- db.collection.update()
- db.collection.updateOne()
- db.collection.updateMany()
- db.collection.watch()
- db.collection.validate()
- 词汇表
- 默认的MongoDB端口
- 默认的MongoDB读/写关注
- 服务器会话
- MongoDB驱动
- FAQ
- 联系我们
- 更多资料
- [快学Mongo]
- [Mongo问题讨论区]
- [Mongo 驱动使用手册]
- 本书使用 GitBook 发布
执行增量 Map-Reduce
执行增量 Map-Reduce
在本页面
Map-reduce 操作可以处理复杂的聚合任务。要执行 map-reduce 操作,MongoDB 提供MapReduce命令,并在mongo shell 中提供db.collection.mapReduce() wrapper 方法。
如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce 而不是每个 time 对整个数据集执行 map-reduce 操作。
执行增量 map-reduce:
在当前集合上运行 map-reduce job 并将结果输出到单独的集合。
如果有更多数据要进行 process,run 后续 map-reduce job:
query
参数指定仅匹配新文档的条件。out
参数,指定将新结果合并到现有输出集合中的reduce
操作。
请考虑以下 example,其中您在sessions
集合上安排 map-reduce 操作,以在每天结束时运行 run。
数据设置
sessions
集合包含 log 用户每天会话的文档,例如:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
当前集合的初始 Map-Reduce
运行第一个 map-reduce 操作如下:
- 定义 map function _将
userid
映射到包含字段userid
,total_time
,count
和avg_time
的 object:
var mapFunction = function() {
var key = this.userid;
var value = {
userid: this.userid,
total_time: this.length,
count: 1,
avg_time: 0
};
emit( key, value );
};
- 使用两个 arguments
key
和values
定义相应的 reduce function 以计算总 time 和计数。key
对应于userid
,values
是 array,其元素对应于映射到mapFunction
中userid
的各个 object。
var reduceFunction = function(key, values) {
var reducedObject = {
userid: key,
total_time: 0,
count:0,
avg_time:0
};
values.forEach( function(value) {
reducedObject.total_time += value.total_time;
reducedObject.count += value.count;
});
return reducedObject;
};
- 使用两个 arguments
key
和reducedValue
定义 finalize function。 function 修改reducedValue
文档以添加另一个字段average
并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0)
reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
return reducedValue;
};
- 使用
mapFunction
,reduceFunction
和finalizeFunction
函数在session
集合上执行 map-reduce。将结果输出到集合session_stat
。如果session_stat
集合已存在,则操作将替换内容:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
out: "session_stat",
finalize: finalizeFunction
}
)
- 查询
session_stats
集合以验证结果:
db.session_stats.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
后续增量 Map-Reduce
之后,随着sessions
集合的增长,您可以运行其他 map-reduce 操作。对于 example,将新文档添加到sessions
集合:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
最终,对usersessions
集合执行增量map-reduce ,但使用该query
字段仅选择新文档。将结果输出到collection session_stats
,但是reduce
将内容与增量map-reduce的结果进行比较:
db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);
查询session_stats
集合以验证结果:
db.session_stats.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
聚合替代
前提条件:将集合设置为原始状态:
db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])
使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:
db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
{ "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
{ "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
{ "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
- 该
$project
阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id
和value
。如果不需要镜像_id
andvalue
结构,则该阶段是可选的 。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
该
$merge
阶段将结果输出到session_stats_agg
集合。如果现有文档_id
与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id
中session_stats_agg
,操作插入文档。查询
session_stats_agg
集合以验证结果:
db.session_stats_agg.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
- 新文档添加到
usersessions
集合中:
db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])
$match
在管道的开头添加一个阶段以指定日期过滤器:
db.usersessions.aggregate([
{ $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
- 查询
session_stats_agg
集合以验证结果:
db.session_stats_agg.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
- 可选的。为了避免
$match
每次运行时都必须修改聚合管道的日期条件,可以在帮助函数中定义包装聚合:
updateSessionStats = function(startDate) {
db.usersessions.aggregate([
{ $match: { ts: { $gte: startDate } } },
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
]);
};
然后,要运行,您只需将开始日期传递给该updateSessionStats()
函数:
updateSessionStats(ISODate('2020-03-05 00:00:00'))
也可以看看
译者:李冠飞
校对:
Copyright © 上海锦木信息技术有限公司 all right reserved,由 MongoDB汉化小组 提供技术支持文件修订时间: 2020-10-11 20:53:05