- MongoDB官方文档中文版
- MongoDB中文手册说明
- MongoDB简介
- 安装 MongoDB
- The mongo Shell
- MongoDB CRUD 操作
- 聚合
- 数据模型
- 事务
- 索引
- 安全
- 安全检查列表
- 启用访问控制
- 身份验证
- 基于角色的访问控制
- TLS / SSL(传输加密)
- 静态加密
- 客户端字段级加密
- 审计
- 网络和配置强化
- 实现字段级别修订
- 安全参考
- 附录
- 变更流
- 复制
- 分片
- 分片键
- 哈希分片
- 范围分片
- 区
- 管理分片区
- 按位置细分数据
- 用于更改SLA或SLO的分层硬件
- 按应用或客户细分数据
- 仅插入工作负载的分布式本地写入
- 管理分片区
- 使用块进行数据分区
- 在分片集群中拆分数据块
- 管理
- 存储
- 存储引擎
- 日志记录
- 管理日志记录
- GridFS
- FAQ:MongoDB 存储
- 存储引擎
- 参考
- 运算符
- 查询与映射运算符
- 更新运算符
- 聚合管道阶段
- 聚合管道操作符
- $abs (aggregation)
- $acos (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $allElementsTrue (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $ceil (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateToParts (aggregation)
- $dateFromString (aggregation)
- $literal (aggregation)
- 查询修饰符
- 数据库命令
- 聚合命令
- 地理空间命令
- 查询和写操作命令
- 查询计划缓存命令
- 认证命令
- 用户管理命令
- 角色管理命令
- 复制命令
- 分片命令
- 会话命令
- 管理命令
- 诊断命令
- 免费监控命令
- 系统事件审计命令
- mongo Shell 方法
- 集合方法
- db.collection.aggregate()
- db.collection.bulkWrite()
- db.collection.copyTo()
- db.collection.count()
- db.collection.countDocuments()
- db.collection.estimatedDocumentCount()
- db.collection.createIndex()
- db.collection.createIndexes()
- db.collection.dataSize()
- db.collection.deleteOne()
- db.collection.deleteMany()
- db.collection.distinct()
- db.collection.drop()
- db.collection.dropIndex()
- db.collection.dropIndexes()
- db.collection.ensureIndex()
- db.collection.explain()
- db.collection.find()
- db.collection.findAndModify()
- db.collection.findOne()
- db.collection.findOneAndDelete()
- db.collection.findOneAndReplace()
- db.collection.findOneAndUpdate()
- db.collection.getIndexes()
- db.collection.getShardDistribution()
- db.collection.getShardVersion()
- db.collection.insert()
- db.collection.insertOne()
- db.collection.insertMany()
- db.collection.isCapped()
- db.collection.latencyStats()
- db.collection.mapReduce()
- db.collection.reIndex()
- db.collection.remove()
- db.collection.renameCollection()
- db.collection.replaceOne()
- db.collection.save()
- db.collection.stats()
- db.collection.storageSize()
- db.collection.totalIndexSize()
- db.collection.totalSize()
- db.collection.update()
- db.collection.updateOne()
- db.collection.updateMany()
- db.collection.watch()
- db.collection.validate()
- 词汇表
- 默认的MongoDB端口
- 默认的MongoDB读/写关注
- 服务器会话
- MongoDB驱动
- FAQ
- 联系我们
- 更多资料
- [快学Mongo]
- [Mongo问题讨论区]
- [Mongo 驱动使用手册]
- 本书使用 GitBook 发布
Map-Reduce转换到聚合管道
Map-Reduce转换到聚合管道
从4.4版本开始,MongoDB添加了$accumulator
和$function
aggregation运算符。这些运算符为用户提供了定义自定义聚合表达式的能力。使用这些操作,可以大致重写map-reduce表达式,如下表所示。
注意
可以使用聚合管道操作符(如$group、$merge等)重写各种map-reduce表达式,而不需要自定义函数。
例如,请参见map-reduce示例。
Map-Reduce到聚合管道转换表
这张表只是粗略的翻译。例如,该表显示了使用$project
的mapFunction
的近似转换。
- 然而,mapFunction逻辑可能需要额外的阶段,例如,如果逻辑包括对数组的迭代:
function() {
this.items.forEach(function(item){ emit(item.sku, 1); });
}
然后,聚合管道包括一个$unwind
和一个$project
:
{ $unwind: "$items "},
{ $project: { emits: { key: { "$items.sku" }, value: 1 } } },
$project
中的emit
字段可以被命名为其他名称。为了进行可视化比较,选择了字段名称emit。
Map-Reduce | Aggregation Pipeline |
---|---|
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: <collection> } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: { db: <db>, coll: <collection> } } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection>}, on: “_id” whenMatched: “replace”, whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection> }, on: “_id” whenMatched: [ { $project: { value: { $function: { body: <reduceFunction>, args: [ “$_id”, [ “$value”, “$$new.value” ] ], lang: “js” } } } } ] whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } } ] ) |
例子
可以使用聚合管道操作符(如$group
、$merge
等)重写各种map-reduce表达式,而不需要自定义函数。但是,为了说明目的,下面的例子提供了两种选择。
示例1
通过cust_id
对订单集合组执行以下map-reduce
操作,并计算每个cust_id
的价格总和:
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:
db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])
备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator
定义自定义函数:
db.orders.aggregate( [
{ $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
{ $group: { // equivalent to the reduce function
_id: "$emit.key",
valuesPrices: { $accumulator: {
init: function() { return 0; },
initArgs: [],
accumulate: function(state, value) { return state + value; },
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) { return state1 + state2; },
lang: "js"
} }
} },
{ $out: "agg_alternative_2" }
] )
首先,
$project
阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:key
包含cust_id
文档的值value
包含price
文档的值
{ "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
{ "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
{ "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
{ "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
{ "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
{ "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
{ "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
{ "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
- 然后,
$group
使用$accumulator
操作符来添加发出的值:
{ "_id" : "Don Quis", "valuesPrices" : 155 }
{ "_id" : "Cam Elot", "valuesPrices" : 60 }
{ "_id" : "Ant O. Knee", "valuesPrices" : 95 }
{ "_id" : "Busby Bee", "valuesPrices" : 125 }
- 最后,
$out
将输出写入集合agg_alternative_2
。或者,您可以使用$merge
而不是$out
。
示例2
以下字段对orders
集合组的map-reduce操作,item.sku
并计算每个sku的订单数量和总订购量。然后,该操作将为每个sku值计算每个订单的平均数量,并将结果合并到输出集合中。
var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = { count: 1, qty: this.items[idx].qty };
emit(key, value);
}
};
var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example2" },
query: { ord_date: { $gte: new Date("2020-03-01") } },
finalize: finalizeFunction2
}
);
备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:
db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator
定义自定义函数:
db.orders.aggregate( [
{ $match: { ord_date: {$gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
{ $group: {
_id: "$emit.key",
value: { $accumulator: {
init: function() { return { count: 0, qty: 0 }; },
initArgs: [],
accumulate: function(state, value) {
state.count += value.count;
state.qty += value.qty;
return state;
},
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) {
return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
},
finalize: function(state) {
state.avg = state.qty / state.count;
return state;
},
lang: "js"}
}
} },
{ $merge: {
into: "agg_alternative_4",
on: "_id",
whenMatched: "replace",
whenNotMatched: "insert"
} }
] )
$match
阶段只选择那些ord_date大于或等于new Date("2020-03-01")的文档。$unwinds
阶段按items数组字段分解文档,为每个数组元素输出一个文档。例如:
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
...
$project
阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:key
包含items.sku
值value
包含具有qty
值和count
值的文档
{ "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
{ "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
...
$group
使用$accumulator
操作符来添加发出的计数和数量,并计算avg字段:
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
- 最后,
$merge
将输出写入集合agg_alternative_4
。如果现有文档具有与新结果相同的键_id,则操作将覆盖现有文档。如果没有具有相同密钥的现有文档,操作将插入该文档。
也可以看看
聚合命令比较
译者:李冠飞
校对:
Copyright © 上海锦木信息技术有限公司 all right reserved,由 MongoDB汉化小组 提供技术支持文件修订时间: 2020-10-11 20:53:05