聚合查询 (Aggregation)

info sdk 1.6 开始支持,目前仅内测用户可以使用,该功能可能 ——

  • 不稳定
  • 被移除
  • 在未来配套不同的计费策略

请勿在生产上大规模使用。

基本概念

聚合操作主要用于对数据的批量处理,往往将记录按条件分组以后,然后再进行一系列操作,例如,求最大值、最小值、平均值,求和等操作。聚合操作还能够对记录进行复杂的操作,可以用于数理统计和数据挖掘。

该功能对数据库资源耗占比普通查询高出几个数量级,建议在做数据分析等离线计算场景下使用,配合云函数输出统计报表等。

管道 pipeline

管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。 数据集在一个管道处理完毕后将结果传递给下一个管道处理。管道操作是可以重复的。

阶段 stage

一般聚合查询分为多个步骤,每个步骤称为 stage。

例如:aggregation.match(…).count(‘count’) 分为两个 Stage,第一个 Stage 先筛选匹配的数据,第二个 Stage 统计上个操作返回的数据总量。

操作符 operator

在 stage 中可以使用 Operator 来对数据进行操作。

例如:aggregation.group({ _id: “$status”,totalAmount: {$sum: “$amount”}}) 使用了 $sum 操作符来进行叠加操作,统计所有数据行的 amount 字段总和。

这里引用 MongoDB 官方文档的一张图帮助理解 Aggregation : 聚合查询 (Aggregation) - 图1

基础用法

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }]

分组(group)

根据 status 分组,并计算 总和,平均数,最大值,最小值,数量

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. aggregation.group({
  5. _id: "$status",
  6. totalAmount: {$sum: "$amount"},
  7. avgAmount: {$avg: "$amount"},
  8. maxAmount: {$max: "$amount"},
  9. minAmount: {$min: "$amount"},
  10. count: {$sum: 1}
  11. })
  12. // 应用聚合查询对象
  13. Product.setAggregation(aggregation).find().then(res => {
  14. // success
  15. }, err => {
  16. // err
  17. })

{% endifanrxCodeTabs %}

info 注意在操作符中引用到的字段需要添加 $ 前缀

查询结果

  1. [ { _id: 'B',
  2. avgAmount: 200,
  3. count: 1,
  4. id: 'B',
  5. maxAmount: 200,
  6. minAmount: 200,
  7. totalAmount: 200 },
  8. { _id: 'A',
  9. avgAmount: 125,
  10. count: 2,
  11. id: 'A',
  12. maxAmount: 150,
  13. minAmount: 100,
  14. totalAmount: 250 } ]

随机抽取多条数据(sample)

在数据表中随机抽取 2 条数据

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. aggregation.sample(2)
  5. // 应用聚合查询对象
  6. Product.setAggregation(aggregation).find().then(res => {
  7. // success
  8. }, err => {
  9. // err
  10. })

{% endifanrxCodeTabs %}

结果

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }]

计数(count)

统计数据表中一共有多少条数据 {% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. aggregation.count('count') // 定义返回对象中的字段名,这里我们设为 'count'
  5. // 应用聚合查询对象
  6. Product.setAggregation(aggregation).find().then(res => {
  7. // success
  8. }, err => {
  9. // err
  10. })

{% endifanrxCodeTabs %} 结果

  1. { count: 3 }

进阶

知晓云 JSSDK Aggregation 的语法系 MongoDB 原生的 Aggregation 语法的子集,当你发现此文档没有覆盖到你的需求时,可以查阅给出的对应 MongoDB 文档

目前开放了如下 stage

  • match
  • group
  • project
  • sample
  • count
  • skip
  • limit
  • sort

skip、limit、sort 不能直接使用在 pipeline 中,而是通过 TableObject 的 offset()、limit()、orderBy() 来 方法来实现,对应关系如下

TableObject 方法 stage
limit limit
order_by sort
offset skip

它们在 pipeline 中的顺序为 order_by → offset → limit。 offset 的默认值为 0,limit 默认值为 20

info 目前限制 stage 数量最多为 2 个。(skip、sort、limit 不计入总数)

目前开放了如下 operator

  • sum
  • avg
  • max
  • min
  • size (用于数组)
  • add
  • subtract
  • multiply
  • divide

match( queryInstance )

可以用于筛选数据

参数说明

  • queryInstance:BaaS.Query 的实例

示例

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }]

查询 status == “A” 的数据行

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query)
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation).find().then(res => {
  11. // success
  12. }, err => {
  13. // err
  14. })

{% endifanrxCodeTabs %} 查询结果

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 100,
  8. "created_at": 1531979523,
  9. "status": "A",
  10. "updated_at": 1531979523,
  11. }]

info 当 TableObject 设置了 aggregation 时,将会忽略 setQuery 设置的查询参数。

MongoDB 文档参考

group( expression )

将集合中的文档分组,可用于统计结果。

可用操作符

  • sum
  • avg
  • max
  • min

示例

参考基础用法的示例 分组(group)

MongoDB 文档参考

project( expression )

可以实现以下效果

  • 控制字段的显示隐藏
  • 在返回的结果中添加新的字段

可用操作符

  • sum
  • avg
  • max
  • min
  • size
  • add
  • subtract
  • multiply
  • divide

示例

数据源

  1. [{
  2. "amount": 100,
  3. "amount_2": 20000,
  4. "created_at": 1531979520,
  5. "created_by": 3,
  6. "items": [111, 222, 333],
  7. "status": "A",
  8. "updated_at": 1532067374,
  9. }, {
  10. "amount": 200,
  11. "amount_2": 30000,
  12. "created_at": 1531979520,
  13. "created_by": 3,
  14. "items": [444, 555, 666],
  15. "status": "B",
  16. "updated_at": 1532067386,
  17. }, {
  18. "amount": 150,
  19. "amount_2": 10000,
  20. "created_at": 1531979520,
  21. "created_by": 3,
  22. "items": [123, 456, 789],
  23. "status": "A",
  24. "updated_at": 1532067362,
  25. }]

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. aggregation.project({
  6. _id: false, // 隐藏 id 字段
  7. status: true, // 显示 status 字段
  8. newFields: ['$created_by', '$created_at'], // 在结果中生成新的字段
  9. size: {$size: '$items'}, // 计算 数组类型字段的长度
  10. divide: {
  11. $divide: ['$amount', {$size: '$items'}] // 除法
  12. },
  13. add: {
  14. $add: ['$amount', '$amount_2'] // 加法
  15. },
  16. sum: {
  17. $sum: "$items" // 计算数组元素总和
  18. },
  19. subtract: {
  20. $subtract: ['$amount', '$amount_2'] // 减法
  21. }
  22. }
  23. )
  24. // 应用聚合查询对象
  25. Product.setAggregation(aggregation).find().then(res => {
  26. // success
  27. }, err => {
  28. // err
  29. })

{% endifanrxCodeTabs %} 结果

  1. [ { add: 20100,
  2. divide: 33.333333333333336,
  3. newFields: [ 3, 1531979520 ],
  4. size: 3,
  5. status: 'A',
  6. subtract: -19900,
  7. sum: 666 },
  8. { add: 30200,
  9. divide: 66.66666666666667,
  10. newFields: [ 3, 1531979520 ],
  11. size: 3,
  12. status: 'B',
  13. subtract: -29800,
  14. sum: 1665 },
  15. { add: 10150,
  16. divide: 50,
  17. newFields: [ 3, 1531979520 ],
  18. size: 3,
  19. status: 'A',
  20. subtract: -9850,
  21. sum: 1368 } ]

MongoDB 文档参考

sample ( size )

随机抽取 size 条数据

参数说明

  • size:数量

示例

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }, {
  17. "amount": 101,
  18. "created_at": 1531979523,
  19. "status": "A",
  20. "updated_at": 1531979523,
  21. }, {
  22. "amount": 102,
  23. "created_at": 1531979523,
  24. "status": "A",
  25. "updated_at": 1531979523,
  26. }]

查找 status 为 A 的数据行,再随机抽取 2 条记录

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query).sample(2)
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation).find().then(res => {
  11. // success
  12. }, err => {
  13. // err
  14. })

{% endifanrxCodeTabs %}

结果

  1. [{
  2. "amount": 101,
  3. "created_at": 1531979523,
  4. "status": "A",
  5. "updated_at": 1531979523,
  6. }, {
  7. "amount": 102,
  8. "created_at": 1531979523,
  9. "status": "A",
  10. "updated_at": 1531979523,
  11. }]

MongoDB 文档参考

count( outputFieldName )

统计数据的行数

参数说明

  • outputFieldName:输出的字段名

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }, {
  17. "amount": 101,
  18. "created_at": 1531979523,
  19. "status": "A",
  20. "updated_at": 1531979523,
  21. }, {
  22. "amount": 102,
  23. "created_at": 1531979523,
  24. "status": "A",
  25. "updated_at": 1531979523,
  26. }]

查找 status 为 A 的数据行,并统计数据量 {% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query).count('count')
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation).find().then(res => {
  11. // success
  12. }, err => {
  13. // err
  14. })

{% endifanrxCodeTabs %}

结果

  1. { count: 4 }

MongoDB 文档参考

skip

使用 TableObject 的 offset 方法

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }]

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query)
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation)
  11. .offset(1) // skip 1
  12. .find().then(res => {
  13. // success
  14. }, err => {
  15. // err
  16. })

{% endifanrxCodeTabs %} 结果

  1. [{
  2. "amount": 100,
  3. "created_at": 1531979523,
  4. "status": "A",
  5. "updated_at": 1531979523,
  6. }]

MongoDB 文档参考

sort

使用 TableObject 的 orderBy 方法

数据源

  1. [ { _id: '5b50270c1c597b00179e9f69',
  2. amount: 200,
  3. amount_2: 30000,
  4. created_at: 1531979520,
  5. created_by: 3,
  6. id: '5b50270c1c597b00179e9f69',
  7. items: [ 444, 555, 666 ],
  8. read_perm: [ 'user:*' ],
  9. status: 'B',
  10. updated_at: 1532067386,
  11. write_perm: [ 'user:*' ] },
  12. { _id: '5b5027041c597b00179e9f68',
  13. amount: 100,
  14. amount_2: 20000,
  15. created_at: 1531979520,
  16. created_by: 3,
  17. id: '5b5027041c597b00179e9f68',
  18. items: [ 111, 222, 333 ],
  19. read_perm: [ 'user:*' ],
  20. status: 'A',
  21. updated_at: 1532067374,
  22. write_perm: [ 'user:*' ] },
  23. { _id: '5b5027131c597b00179e9f6a',
  24. amount: 150,
  25. amount_2: 10000,
  26. created_at: 1531979520,
  27. created_by: 3,
  28. id: '5b5027131c597b00179e9f6a',
  29. items: [ 123, 456, 789 ],
  30. read_perm: [ 'user:*' ],
  31. status: 'A',
  32. updated_at: 1532067362,
  33. write_perm: [ 'user:*' ] } ]

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query).count('count')
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation)
  11. .orderBy('-updated_at') // 根据更新时间升序排列,越早更新的排的越前
  12. .find()
  13. .then(res => {
  14. // success
  15. }, err => {
  16. // err
  17. })

{% endifanrxCodeTabs %} 结果

  1. [ { _id: '5b5027041c597b00179e9f68',
  2. amount: 100,
  3. amount_2: 20000,
  4. created_at: 1531979520,
  5. created_by: 3,
  6. id: '5b5027041c597b00179e9f68',
  7. items: [ 111, 222, 333 ],
  8. status: 'A',
  9. updated_at: 1532067374 },
  10. { _id: '5b5027131c597b00179e9f6a',
  11. amount: 150,
  12. amount_2: 10000,
  13. created_at: 1531979520,
  14. created_by: 3,
  15. id: '5b5027131c597b00179e9f6a',
  16. items: [ 123, 456, 789 ],
  17. status: 'A',
  18. updated_at: 1532067362 } ]

MongoDB 文档参考

limit

使用 TableObject 的 limit 方法

数据源

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }, {
  7. "amount": 200,
  8. "created_at": 1531979532,
  9. "status": "B",
  10. "updated_at": 1531979532,
  11. }, {
  12. "amount": 100,
  13. "created_at": 1531979523,
  14. "status": "A",
  15. "updated_at": 1531979523,
  16. }]

{% ifanrxCodeTabs %}

  1. let Product = new wx.BaaS.TableObject(tableName)
  2. // 实例化 aggregation 对象
  3. var aggregation = new wx.BaaS.Aggregation()
  4. // 实例化查询对象
  5. var query = new wx.BaaS.Query()
  6. // 设置查询条件
  7. query.compare('status', '=', 'A')
  8. aggregation.match(query)
  9. // 应用聚合查询对象
  10. Product.setAggregation(aggregation)
  11. .limit(1) // limit 1 限制返回数只有一个
  12. .find().then(res => {
  13. // success
  14. }, err => {
  15. // err
  16. })

{% endifanrxCodeTabs %} 结果

  1. [{
  2. "amount": 150,
  3. "created_at": 1531979539,
  4. "status": "A",
  5. "updated_at": 1531979539,
  6. }]

MongoDB 文档参考