- MongoDB官方文档中文版
- MongoDB中文手册说明
- MongoDB简介
- 安装 MongoDB
- The mongo Shell
- MongoDB CRUD 操作
- 聚合
- 数据模型
- 事务
- 索引
- 安全
- 安全检查列表
- 启用访问控制
- 身份验证
- 基于角色的访问控制
- TLS / SSL(传输加密)
- 静态加密
- 客户端字段级加密
- 审计
- 网络和配置强化
- 实现字段级别修订
- 安全参考
- 附录
- 变更流
- 复制
- 分片
- 分片键
- 哈希分片
- 范围分片
- 区
- 管理分片区
- 按位置细分数据
- 用于更改SLA或SLO的分层硬件
- 按应用或客户细分数据
- 仅插入工作负载的分布式本地写入
- 管理分片区
- 使用块进行数据分区
- 在分片集群中拆分数据块
- 管理
- 存储
- 存储引擎
- 日志记录
- 管理日志记录
- GridFS
- FAQ:MongoDB 存储
- 存储引擎
- 参考
- 运算符
- 查询与映射运算符
- 更新运算符
- 聚合管道阶段
- 聚合管道操作符
- $abs (aggregation)
- $acos (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $allElementsTrue (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $ceil (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateToParts (aggregation)
- $dateFromString (aggregation)
- $literal (aggregation)
- 查询修饰符
- 数据库命令
- 聚合命令
- 地理空间命令
- 查询和写操作命令
- 查询计划缓存命令
- 认证命令
- 用户管理命令
- 角色管理命令
- 复制命令
- 分片命令
- 会话命令
- 管理命令
- 诊断命令
- 免费监控命令
- 系统事件审计命令
- mongo Shell 方法
- 集合方法
- db.collection.aggregate()
- db.collection.bulkWrite()
- db.collection.copyTo()
- db.collection.count()
- db.collection.countDocuments()
- db.collection.estimatedDocumentCount()
- db.collection.createIndex()
- db.collection.createIndexes()
- db.collection.dataSize()
- db.collection.deleteOne()
- db.collection.deleteMany()
- db.collection.distinct()
- db.collection.drop()
- db.collection.dropIndex()
- db.collection.dropIndexes()
- db.collection.ensureIndex()
- db.collection.explain()
- db.collection.find()
- db.collection.findAndModify()
- db.collection.findOne()
- db.collection.findOneAndDelete()
- db.collection.findOneAndReplace()
- db.collection.findOneAndUpdate()
- db.collection.getIndexes()
- db.collection.getShardDistribution()
- db.collection.getShardVersion()
- db.collection.insert()
- db.collection.insertOne()
- db.collection.insertMany()
- db.collection.isCapped()
- db.collection.latencyStats()
- db.collection.mapReduce()
- db.collection.reIndex()
- db.collection.remove()
- db.collection.renameCollection()
- db.collection.replaceOne()
- db.collection.save()
- db.collection.stats()
- db.collection.storageSize()
- db.collection.totalIndexSize()
- db.collection.totalSize()
- db.collection.update()
- db.collection.updateOne()
- db.collection.updateMany()
- db.collection.watch()
- db.collection.validate()
- 词汇表
- 默认的MongoDB端口
- 默认的MongoDB读/写关注
- 服务器会话
- MongoDB驱动
- FAQ
- 联系我们
- 更多资料
- [快学Mongo]
- [Mongo问题讨论区]
- [Mongo 驱动使用手册]
- 本书使用 GitBook 发布
Map-Reduce 示例
Map-Reduce 例子
在本页面
在mongo shell 中,db.collection.mapReduce()方法是MapReduce命令周围的 wrapper。以下示例使用db.collection.mapReduce()方法:
聚合管道作为替代
聚合管道 比map-reduce提供更好的性能和更一致的接口。
各种map-reduce表达式可以使用被重写聚合管道运算符,诸如
$group
,$merge
等下面的示例包括聚合管道备选方案。
orders
使用以下文档创建样本集合:
db.orders.insertMany([
{ _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
{ _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
{ _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
{ _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }
])
返回每位客户的总价格
对orders
集合执行map-reduce操作,以对进行分组cust_id
,并计算price
每个的 的总和cust_id
:
定义map函数来处理每个输入文档:
在函数中,
this
指的是map-reduce操作正在处理的文档。该函数将映射
price
到cust_id
每个文档的,并发出cust_id
和price
对。
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
使用两个参数
keyCustId
和定义相应的reduce函数valuesPrices
:valuesPrices
是一个数组,其元素是price
由map功能发射并由分组值keyCustId
。该函数将
valuesPrice
数组简化为其元素的总和。
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
orders
使用mapFunction1
map函数和reduceFunction1
reduce函数对集合中的所有文档执行map-reduce 。
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
此操作将结果输出到名为的集合 map_reduce_example
。如果map_reduce_example
集合已经存在,则该操作将用此map-reduce操作的结果替换内容。
- 查询
map_reduce_example
集合以验证结果:
db.map_reduce_example.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "Ant O. Knee", "value" : 95 }
{ "_id" : "Busby Bee", "value" : 125 }
{ "_id" : "Cam Elot", "value" : 60 }
{ "_id" : "Don Quis", "value" : 155 }
聚合替代
使用可用的聚合管道运算符,您可以重写map-reduce操作,而无需定义自定义函数:
db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])
$group
由平台组cust_id
并计算value
字段(参见$sum
)。该value
字段包含price
每个的总计cust_id
。
该阶段将以下文档输出到下一阶段:
{ "_id" : "Don Quis", "value" : 155 }
{ "_id" : "Ant O. Knee", "value" : 95 }
{ "_id" : "Cam Elot", "value" : 60 }
{ "_id" : "Busby Bee", "value" : 125 }
db.agg_alternative_1.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "Ant O. Knee", "value" : 95 }
{ "_id" : "Busby Bee", "value" : 125 }
{ "_id" : "Cam Elot", "value" : 60 }
{ "_id" : "Don Quis", "value" : 155 }
用每个项目的平均数量计算订单和总数量
在此示例中,您将对值大于或等于的orders
所有文档在集合上执行map-reduce操作 。工序按字段分组 ,并计算每个的订单数量和总订购量。然后,该操作将为每个值计算每个订单的平均数量,并将结果合并到输出集合中。合并结果时,如果现有文档的密钥与新结果相同,则该操作将覆盖现有文档。如果不存在具有相同密钥的文档,则该操作将插入该文档。
定义map函数来处理每个输入文档:
- 在函数中,
this
指的是map-reduce操作正在处理的文档。 - 对于每个商品,该函数将其
sku
与一个新对象相关联,该对象value
包含订单的count
of1
和该商品qty
,并发出sku
andvalue
对。
- 在函数中,
var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = { count: 1, qty: this.items[idx].qty };
emit(key, value);
}
};
使用两个参数
keySKU
和定义相应的reduce函数countObjVals
:countObjVals
是一个数组,其元素是映射到keySKU
由map函数传递给reducer函数的分组值的对象。- 该函数将
countObjVals
数组简化为reducedValue
包含count
和qty
字段的单个对象。 - 在中
reducedVal
,该count
字段包含count
各个数组元素的qty
字段总和,而该字段包含各个数组元素的 字段总和qty
。
var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
- 定义有两个参数的函数确定
key
和reducedVal
。该函数修改reducedVal
对象以添加一个名为avg
的计算字段,并返回修改后的对象:
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
- 在执行的map-reduce操作
orders
使用集合mapFunction2
,reduceFunction2
和finalizeFunction2
功能。
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example2" },
query: { ord_date: { $gte: new Date("2020-03-01") } },
finalize: finalizeFunction2
}
);
此操作使用该query
字段选择仅ord_date
大于或等于的那些文档。然后将结果输出到集合 。new Date("2020-03-01")
map_reduce_example2
如果map_reduce_example2
集合已经存在,则该操作会将现有内容与此map-reduce操作的结果合并。也就是说,如果现有文档具有与新结果相同的密钥,则该操作将覆盖现有文档。如果不存在具有相同密钥的文档,则该操作将插入该文档。
- 查询
map_reduce_example2
集合以验证结果:
db.map_reduce_example2.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "apples", "value" : { "count" : 3, "qty" : 30, "avg" : 10 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 6, "qty" : 58, "avg" : 9.666666666666666 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
聚合替代
使用可用的聚合管道运算符,您可以重写map-reduce操作,而无需定义自定义函数:
db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
该
$match
阶段仅选择ord_date
大于或等于new Date("2020-03-01")
的那些文档。该
$unwinds
阶段按items
数组字段细分文档,以输出每个数组元素的文档。例如:
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
...
$group
由平台组items.sku
,计算每个SKU:- 该
qty
字段。该qty
字段包含qty
每个订单的总数items.sku
(请参阅参考资料$sum
)。 orders_ids
列表。该orders_ids
字段包含不同顺序的列表_id
的对items.sku
(参见$addToSet
)。
- 该
{ "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] }
{ "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] }
{ "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
{ "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] }
{ "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
最后,
$merge
将输出写入collectionagg_alternative_3
。如果现有文档的密钥_id
与新结果相同,则该操作将覆盖现有文档。如果不存在具有相同密钥的文档,则该操作将插入该文档。查询
agg_alternative_3
集合以验证结果:
db.agg_alternative_3.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
译者:李冠飞
校对:
Copyright © 上海锦木信息技术有限公司 all right reserved,由 MongoDB汉化小组 提供技术支持文件修订时间: 2020-10-11 20:53:05