Map-Reduce转换到聚合管道

    从4.4版本开始,MongoDB添加了$accumulator$function aggregation运算符。这些运算符为用户提供了定义自定义聚合表达式的能力。使用这些操作,可以大致重写map-reduce表达式,如下表所示。

    注意

    可以使用聚合管道操作符(如$group、$merge等)重写各种map-reduce表达式,而不需要自定义函数。

    例如,请参见map-reduce示例。

    Map-Reduce到聚合管道转换表

    这张表只是粗略的翻译。例如,该表显示了使用$projectmapFunction的近似转换。

    • 然而,mapFunction逻辑可能需要额外的阶段,例如,如果逻辑包括对数组的迭代:

      1. function() {
      2. this.items.forEach(function(item){ emit(item.sku, 1); });
      3. }

      然后,聚合管道包括一个$unwind和一个$project:

      1. { $unwind: "$items "},
      2. { $project: { emits: { key: { "$items.sku" }, value: 1 } } },
    • $project中的emit字段可以被命名为其他名称。为了进行可视化比较,选择了字段名称emit。

    Map-Reduce Aggregation Pipeline
    db.collection.mapReduce(
    <mapFunction>,
    <reduceFunction>,
    {
    query: <queryFilter>,
    sort: <sortOrder>,
    limit: <number>,
    finalize: <finalizeFunction>,
    out: <collection>
    } )
    db.collection.aggregate( [
    { $match: <queryFilter> },
    { $sort: <sortOrder> },
    { $limit: <number> },
    { $project: { emits: { k: <expression>, v: <expression> } } },
    { $unwind: “$emits” },
    { $group: {
    _id: “$emits.k”},
    value: { $accumulator: {
    init: <initCode>,
    accumulate: <reduceFunction>,
    accumulateArgs: [ “$emit.v”],
    merge: <reduceFunction>,
    finalize: <finalizeFunction>,
    lang: “js” }}
    } },
    { $out: <collection> }
    ] )
    db.collection.mapReduce(
    <mapFunction>,
    <reduceFunction>,
    {
    query: <queryFilter>,
    sort: <sortOrder>,
    limit: <number>,
    finalize: <finalizeFunction>,
    out: { merge: <collection>, db: <db> }
    }
    )
    db.collection.aggregate( [
    { $match: <queryFilter> },
    { $sort: <sortOrder> },
    { $limit: <number> },
    { $project: { emits: { k: <expression>, v: <expression> } } },
    { $unwind: “$emits” },
    { $group: {
    _id: “$emits.k”},
    value: { $accumulator: {
    init: <initCode>,
    accumulate: <reduceFunction>,
    accumulateArgs: [ “$emit.v”],
    merge: <reduceFunction>,
    finalize: <finalizeFunction>,
    lang: “js” }}
    } },
    { $out: { db: <db>, coll: <collection> } }
    ] )
    db.collection.mapReduce(
    <mapFunction>,
    <reduceFunction>,
    {
    query: <queryFilter>,
    sort: <sortOrder>,
    limit: <number>,
    finalize: <finalizeFunction>,
    out: { merge: <collection>, db: <db> }
    }
    )
    db.collection.aggregate( [
    { $match: <queryFilter> },
    { $sort: <sortOrder> },
    { $limit: <number> },
    { $project: { emits: { k: <expression>, v: <expression> } } },
    { $unwind: “$emits” },
    { $group: {
    _id: “$emits.k”},
    value: { $accumulator: {
    init: <initCode>,
    accumulate: <reduceFunction>,
    accumulateArgs: [ “$emit.v”],
    merge: <reduceFunction>,
    finalize: <finalizeFunction>,
    lang: “js” }}
    } },
    { $merge: {
    into: { db: <db>, coll: <collection>},
    on: “_id”
    whenMatched: “replace”,
    whenNotMatched: “insert”
    } },
    ] )
    db.collection.mapReduce(
    <mapFunction>,
    <reduceFunction>,
    {
    query: <queryFilter>,
    sort: <sortOrder>,
    limit: <number>,
    finalize: <finalizeFunction>,
    out: { merge: <collection>, db: <db> }
    }
    )
    db.collection.aggregate( [
    { $match: <queryFilter> },
    { $sort: <sortOrder> },
    { $limit: <number> },
    { $project: { emits: { k: <expression>, v: <expression> } } },
    { $unwind: “$emits” },
    { $group: {
    _id: “$emits.k”},
    value: { $accumulator: {
    init: <initCode>,
    accumulate: <reduceFunction>,
    accumulateArgs: [ “$emit.v”],
    merge: <reduceFunction>,
    finalize: <finalizeFunction>,
    lang: “js” }}
    } },
    { $merge: {
    into: { db: <db>, coll: <collection> },
    on: “_id”
    whenMatched: [
    { $project: {
    value: { $function: {
    body: <reduceFunction>,
    args: [
    “$_id”,
    [ “$value”, “$$new.value” ]
    ],
    lang: “js”
    } }
    } }
    ]
    whenNotMatched: “insert”
    } },
    ] )
    db.collection.mapReduce(
    <mapFunction>,
    <reduceFunction>,
    {
    query: <queryFilter>,
    sort: <sortOrder>,
    limit: <number>,
    finalize: <finalizeFunction>,
    out: { inline: 1 }
    }
    )
    db.collection.aggregate( [
    { $match: <queryFilter> },
    { $sort: <sortOrder> },
    { $limit: <number> },
    { $project: { emits: { k: <expression>, v: <expression> } } },
    { $unwind: “$emits” },
    { $group: {
    _id: “$emits.k”},
    value: { $accumulator: {
    init: <initCode>,
    accumulate: <reduceFunction>,
    accumulateArgs: [ “$emit.v”],
    merge: <reduceFunction>,
    finalize: <finalizeFunction>,
    lang: “js” }}
    } }
    ] )

    例子

    可以使用聚合管道操作符(如$group$merge等)重写各种map-reduce表达式,而不需要自定义函数。但是,为了说明目的,下面的例子提供了两种选择。

    示例1

    通过cust_id对订单集合组执行以下map-reduce操作,并计算每个cust_id的价格总和:

    1. var mapFunction1 = function() {
    2. emit(this.cust_id, this.price);
    3. };
    4. var reduceFunction1 = function(keyCustId, valuesPrices) {
    5. return Array.sum(valuesPrices);
    6. };
    7. db.orders.mapReduce(
    8. mapFunction1,
    9. reduceFunction1,
    10. { out: "map_reduce_example" }
    11. )

    备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:

    1. db.orders.aggregate([
    2. { $group: { _id: "$cust_id", value: { $sum: "$price" } } },
    3. { $out: "agg_alternative_1" }
    4. ])

    备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator定义自定义函数:

    1. db.orders.aggregate( [
    2. { $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
    3. { $group: { // equivalent to the reduce function
    4. _id: "$emit.key",
    5. valuesPrices: { $accumulator: {
    6. init: function() { return 0; },
    7. initArgs: [],
    8. accumulate: function(state, value) { return state + value; },
    9. accumulateArgs: [ "$emit.value" ],
    10. merge: function(state1, state2) { return state1 + state2; },
    11. lang: "js"
    12. } }
    13. } },
    14. { $out: "agg_alternative_2" }
    15. ] )
    1. 首先,$project阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:

      • key包含cust_id文档的值
      • value包含price文档的值
      1. { "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
      2. { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
      3. { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
      4. { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
      5. { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
      6. { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
      7. { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
      8. { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
      9. { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
      10. { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
    2. 然后,$group使用$accumulator操作符来添加发出的值:

      1. { "_id" : "Don Quis", "valuesPrices" : 155 }
      2. { "_id" : "Cam Elot", "valuesPrices" : 60 }
      3. { "_id" : "Ant O. Knee", "valuesPrices" : 95 }
      4. { "_id" : "Busby Bee", "valuesPrices" : 125 }
    3. 最后,$out将输出写入集合agg_alternative_2。或者,您可以使用$merge而不是$out

      示例2

    以下字段对orders集合组的map-reduce操作,item.sku并计算每个sku的订单数量和总订购量。然后,该操作将为每个sku值计算每个订单的平均数量,并将结果合并到输出集合中。

    1. var mapFunction2 = function() {
    2. for (var idx = 0; idx < this.items.length; idx++) {
    3. var key = this.items[idx].sku;
    4. var value = { count: 1, qty: this.items[idx].qty };
    5. emit(key, value);
    6. }
    7. };
    8. var reduceFunction2 = function(keySKU, countObjVals) {
    9. reducedVal = { count: 0, qty: 0 };
    10. for (var idx = 0; idx < countObjVals.length; idx++) {
    11. reducedVal.count += countObjVals[idx].count;
    12. reducedVal.qty += countObjVals[idx].qty;
    13. }
    14. return reducedVal;
    15. };
    16. var finalizeFunction2 = function (key, reducedVal) {
    17. reducedVal.avg = reducedVal.qty/reducedVal.count;
    18. return reducedVal;
    19. };
    20. db.orders.mapReduce(
    21. mapFunction2,
    22. reduceFunction2,
    23. {
    24. out: { merge: "map_reduce_example2" },
    25. query: { ord_date: { $gte: new Date("2020-03-01") } },
    26. finalize: finalizeFunction2
    27. }
    28. );

    备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:

    1. db.orders.aggregate( [
    2. { $match: { ord_date: { $gte: new Date("2020-03-01") } } },
    3. { $unwind: "$items" },
    4. { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
    5. { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
    6. { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
    7. ] )

    备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator定义自定义函数:

    1. db.orders.aggregate( [
    2. { $match: { ord_date: {$gte: new Date("2020-03-01") } } },
    3. { $unwind: "$items" },
    4. { $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
    5. { $group: {
    6. _id: "$emit.key",
    7. value: { $accumulator: {
    8. init: function() { return { count: 0, qty: 0 }; },
    9. initArgs: [],
    10. accumulate: function(state, value) {
    11. state.count += value.count;
    12. state.qty += value.qty;
    13. return state;
    14. },
    15. accumulateArgs: [ "$emit.value" ],
    16. merge: function(state1, state2) {
    17. return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
    18. },
    19. finalize: function(state) {
    20. state.avg = state.qty / state.count;
    21. return state;
    22. },
    23. lang: "js"}
    24. }
    25. } },
    26. { $merge: {
    27. into: "agg_alternative_4",
    28. on: "_id",
    29. whenMatched: "replace",
    30. whenNotMatched: "insert"
    31. } }
    32. ] )
    1. $match阶段只选择那些ord_date大于或等于new Date(“2020-03-01”)的文档。

    2. $unwinds阶段按items数组字段分解文档,为每个数组元素输出一个文档。例如:

      1. { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
      2. { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
      3. { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
      4. { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
      5. { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
      6. { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
      7. { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
      8. { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
      9. ...
    3. $project阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:

      • key包含items.sku
      • value包含具有qty值和count值的文档
      1. { "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
      2. { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
      3. { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
      4. { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
      5. { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
      6. { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
      7. { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
      8. { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
      9. ...
    4. $group使用$accumulator操作符来添加发出的计数和数量,并计算avg字段:

      1. { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
      2. { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
      3. { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
      4. { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
      5. { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
    5. 最后,$merge将输出写入集合agg_alternative_4。如果现有文档具有与新结果相同的键_id,则操作将覆盖现有文档。如果没有具有相同密钥的现有文档,操作将插入该文档。

    也可以看看
    聚合命令比较

    译者:李冠飞

    校对:

    参见

    原文 - Map-Reduce to Aggregation Pipeline