- MongoDB官方文档中文版
- MongoDB用户手册
- MongoDB简介
- 安装 MongoDB
- 安装MongoDB社区版
- 安装MongoDB企业版
- 将社区版MongoDB升级到企业版MongoDB
- 验证MongoDB软件包的完整性
- Mongo Shell
- 增删改查操作
- 聚合
- 数据模式
- 数据建模介绍
- 模式验证
- Data Modeling Concepts
- Data Model Examples and Patterns
- Model Relationships Between Documents
- Model One-to-One Relationships with Embedded Documents
- Model One-to-Many Relationships with Embedded Documents
- Model One-to-Many Relationships with Document References
- Model Tree Structures
- Model Tree Structures with Parent References
- Model Tree Structures with Child References
- Model Tree Structures with an Array of Ancestors
- Model Tree Structures with Materialized Paths
- Model Tree Structures with Nested Sets
- Model Specific Application Contexts
- Model Data for Atomic Operations
- Model Data to Support Keyword Search
- Model Data for Schema Versioning
- Model Monetary Data
- Model Time Data
- Model Computed Data
- Data Model Reference
- 事务
- 索引
- Single Field Indexes
- Compound Indexes
- Multikey Indexes
- Text Indexes
- Wildcard Indexes
- 2dsphere Indexes
- 2d Indexes
- geoHaystack Indexes
- Hashed Indexes
- 索引特性
- Index Builds on Populated Collections
- 索引交集
- Manage Indexes
- 衡量索引使用
- Indexing Strategies
- Indexing Reference
- 2d索引
- 2dsphere索引
- 复合索引
- geoHaystack索引
- Hashed 索引
- 在填充的集合上建立索引
- 索引参考
- 索引策略
- 管理索引
- 多键索引
- 单字段索引
- 文本索引
- 通配符索引
- 安全
- 安全检查列表
- Enable Access Control
- 身份验证
- Users
- Add Users
- Authentication Mechanisms
- SCRAM
- x.509
- Enterprise Authentication Mechanisms
- Kerberos Authentication
- LDAP Proxy Authentication
- LDAP Authorization
- Internal/Membership Authentication
- Deploy Replica Set With Keyfile Authentication
- Update Replica Set to Keyfile Authentication
- Update Replica Set to Keyfile Authentication (No Downtime)
- Rotate Keys for Replica Sets
- Deploy Sharded Cluster with Keyfile Authentication
- Update Sharded Cluster to Keyfile Authentication
- Update Sharded Cluster to Keyfile Authentication (No Downtime)
- Rotate Keys for Sharded Clusters
- Use x.509 Certificate for Membership Authentication
- Upgrade from Keyfile Authentication to x.509 Authentication
- Rolling Update of x.509 Cluster Certificates that Contain New DN
- Role-Based Access Control
- TLS/SSL (Transport Encryption)
- Encryption at Rest
- Client-Side Field Level Encryption
- 审计
- Network and Configuration Hardening
- Implement Field Level Redaction
- Security Reference
- Create a Vulnerability Report
- Appendix
- 附录
- 启用访问控制
- 网络和配置强化
- 安全参考
- 改变流
- 复制集
- 复制集成员
- 副本集日志
- Replica Set Data Synchronization
- 副本集部署架构
- Replica Set High Availability
- Replica Set Read and Write Semantics
- Replica Set Deployment Tutorials
- Member Configuration Tutorials
- Replica Set Maintenance Tutorials
- Change the Size of the Oplog
- Perform Maintenance on Replica Set Members
- Force a Member to Become Primary
- Resync a Member of a Replica Set
- Configure Replica Set Tag Sets
- Reconfigure a Replica Set with Unavailable Members
- Manage Chained Replication
- Change Hostnames in a Replica Set
- Configure a Secondary’s Sync Target
- Replication Reference
- 副本集数据同步
- 分片
- 分片集群组成]
- 片键
- 哈希分片
- 范围分片
- 部署分片集群
- 区域
- Data Partitioning with Chunks
- Balancer
- Administration
- Config Server Administration
- Replace a Config Server
- View Cluster Configuration
- Restart a Sharded Cluster
- Migrate a Sharded Cluster to Different Hardware
- Add Shards to a Cluster
- Remove Shards from an Existing Sharded Cluster
- Clear jumbo Flag
- Back Up Cluster Metadata
- Convert Sharded Cluster to Replica Set
- Convert a Replica Set to a Sharded Cluster
- Convert a Shard Standalone to a Shard Replica Set
- Sharding Reference
- 分片键
- Zone
- 管理权限
- 产品说明
- Operations Checklist
- Development Checklist
- Performance
- 配置和维护
- Data Center Awareness
- MongoDB Backup Methods
- Back Up and Restore with Filesystem Snapshots
- Back Up and Restore with MongoDB Tools
- Restore a Replica Set from MongoDB Backups
- Backup and Restore Sharded Clusters
- Back Up a Sharded Cluster with File System Snapshots
- Back Up a Sharded Cluster with Database Dumps
- Schedule Backup Window for Sharded Clusters
- Restore a Sharded Cluster
- Recover a Standalone after an Unexpected Shutdown
- Monitoring for MongoDB
- 开发检查表
- MogoDB 备份方法
- 操作检查表
- MongoDB性能
- 存储
- 常见问题
- 参考
- Operators
- Query and Projection Operators
- Comparison Query Operators
- Logical Query Operators
- Element Query Operators
- Evaluation Query Operators
- Geospatial Query Operators
- Array Query Operators
- Bitwise Query Operators
- $comment
- 查询与映射运算符
- 更新运算符
- Field Update Operators
- Array Update Operators
- Bitwise Update Operator
- Aggregation Pipeline Stages
- $addFields (aggregation)
- $bucket (aggregation)
- $bucketAuto (aggregation)
- $collStats (aggregation)
- $count (aggregation)
- $currentOp (aggregation)
- $facet (aggregation)
- $geoNear (aggregation)
- $graphLookup (aggregation)
- $group (aggregation)
- $indexStats (aggregation)
- $limit (aggregation)
- $listLocalSessions
- $listSessions
- $lookup (aggregation)
- $match (aggregation)
- $merge (aggregation)
- $out (aggregation)
- $planCacheStats
- $project (aggregation)
- $redact (aggregation)
- $replaceRoot (aggregation)
- $replaceWith (aggregation)
- $sample (aggregation)
- $set (aggregation)
- $skip (aggregation)
- $sort (aggregation)
- $sortByCount (aggregation)
- $unionWith (aggregation)
- $unset (aggregation)
- $unwind (aggregation)
- Aggregation Pipeline Operators
- $sin (aggregation)
- $abs (aggregation)
- $slice (aggregation)
- $accumulator (aggregation)
- $split (aggregation)
- $acos (aggregation)
- $sqrt (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $allElementsTrue (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $sum (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $tan (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $binarySize (aggregation)
- $bsonSize (aggregation)
- $ceil (aggregation)
- $toInt (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateToParts (aggregation)
- $type (aggregation)
- $dateFromString (aggregation)
- $week (aggregation)
- $dateToString (aggregation)
- $year (aggregation)
- $dayOfMonth (aggregation)
- $zip (aggregation)
- $dayOfWeek (aggregation)
- $dayOfYear (aggregation)
- $degreesToRadians (aggregation)
- $divide (aggregation)
- $eq (aggregation)
- $exp (aggregation)
- $filter (aggregation)
- $first (aggregation accumulator)
- $first (aggregation)
- $floor (aggregation)
- $function (aggregation)
- $gt (aggregation)
- $gte (aggregation)
- $hour (aggregation)
- $ifNull (aggregation)
- $in (aggregation)
- $indexOfArray (aggregation)
- $indexOfBytes (aggregation)
- $indexOfCP (aggregation)
- $isArray (aggregation)
- $isNumber (aggregation)
- $isoDayOfWeek (aggregation)
- $isoWeek (aggregation)
- $isoWeekYear (aggregation)
- $last (aggregation accumulator)
- $last (aggregation)
- $let (aggregation)
- $literal (aggregation)
- $ln (aggregation)
- $log (aggregation)
- $log10 (aggregation)
- $lt (aggregation)
- $lte (aggregation)
- $trim (aggregation)
- $map (aggregation)
- $max (aggregation)
- $mergeObjects (aggregation)
- $meta
- $min (aggregation)
- $millisecond (aggregation)
- $minute (aggregation)
- $mod (aggregation)
- $month (aggregation)
- $multiply (aggregation)
- $ne (aggregation)
- $not (aggregation)
- $objectToArray (aggregation)
- $or (aggregation)
- $pow (aggregation)
- $push (aggregation)
- $radiansToDegrees (aggregation)
- $range (aggregation)
- $reduce (aggregation)
- $regexFind (aggregation)
- $regexFindAll (aggregation)
- $regexMatch (aggregation)
- $replaceOne (aggregation)
- $replaceAll (aggregation)
- $reverseArray (aggregation)
- $round (aggregation)
- $rtrim (aggregation)
- $second (aggregation)
- $setDifference (aggregation)
- $setEquals (aggregation)
- $setIntersection (aggregation)
- $setIsSubset (aggregation)
- $setUnion (aggregation)
- $size (aggregation)
- $slice (aggregation)
- $split (aggregation)
- $sqrt (aggregation)
- $stdDevPop (aggregation)
- $stdDevSamp (aggregation)
- $strcasecmp (aggregation)
- $strLenBytes (aggregation)
- $strLenCP (aggregation)
- $substr (aggregation)
- $substrBytes (aggregation)
- $substrCP (aggregation)
- $subtract (aggregation)
- $sum (aggregation)
- $switch (aggregation)
- $tan (aggregation)
- $toBool (aggregation)
- $toDate (aggregation)
- $toDecimal (aggregation)
- $toDouble(aggregation)
- $toInt (aggregation)
- $toLong (aggregation)
- $toObjectId (aggregation)
- $toString (aggregation)
- $toLower (aggregation)
- $toUpper (aggregation)
- $trim (aggregation)
- $trunc (aggregation)
- $type (aggregation)
- $week (aggregation)
- $year (aggregation)
- $zip (aggregation)
- 查询修饰符
- $comment
- $explain
- $hint
- $max
- $maxTimeMS
- $min
- $orderby
- $query
- $returnKey
- $showDiskLoc
- $natural
- 聚合管道操作符
- $abs (aggregation)
- $acos (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $ceil (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateFromString (aggregation)
- $dateToParts (aggregation)
- $dateToString (aggregation)
- $literal (aggregation)
- 聚合管道阶段
- 数据库命令
- Aggregation Commands
- aggregate
- count
- distinct
- mapReduce
- Geospatial Commands
- geoSearch
- Query and Write Operation Commands
- delete
- find
- findAndModify
- getLastError
- getMore
- insert
- resetError
- update
- 查询计划缓存命令
- planCacheClear
- planCacheClearFilters
- planCacheListFilters
- planCacheSetFilter
- 认证命令
- authenticate
- getnonce
- logout
- User Management Commands
- createUser
- dropAllUsersFromDatabase
- dropUser
- grantRolesToUser
- revokeRolesFromUser
- updateUser
- usersInfo
- Role Management Commands
- createRole
- dropRole
- dropAllRolesFromDatabase
- grantPrivilegesToRole
- grantRolesToRole
- invalidateUserCache
- revokePrivilegesFromRole
- revokeRolesFromRole
- rolesInfo
- updateRole
- Replication Commands
- applyOps
- isMaster
- replSetAbortPrimaryCatchUp
- replSetFreeze
- replSetGetConfig
- replSetGetStatus
- replSetInitiate
- replSetMaintenance
- replSetReconfig
- replSetResizeOplog
- replSetStepDown
- replSetSyncFrom
- Sharding Commands
- addShard
- addShardToZone
- balancerCollectionStatus
- balancerStart
- balancerStatus
- balancerStop
- checkShardingIndex
- clearJumboFlag
- cleanupOrphaned
- enableSharding
- flushRouterConfig
- getShardMap
- getShardVersion
- isdbgrid
- listShards
- medianKey
- moveChunk
- movePrimary
- mergeChunks
- refineCollectionShardKey
- removeShard
- removeShardFromZone
- setShardVersion
- shardCollection
- shardingState
- split
- splitChunk
- splitVector
- unsetSharding
- updateZoneKeyRange
- Sessions Commands
- abortTransaction
- commitTransaction
- endSessions
- killAllSessions
- killAllSessionsByPattern
- killSessions
- refreshSessions
- startSession
- Administration Commands
- cloneCollectionAsCapped
- collMod
- compact
- connPoolSync
- convertToCapped
- create
- createIndexes
- currentOp
- drop
- dropDatabase
- dropConnections
- dropIndexes
- filemd5
- fsync
- fsyncUnlock
- getDefaultRWConcern
- getParameter
- killCursors
- killOp
- listCollections
- listDatabases
- listIndexes
- logRotate
- reIndex
- renameCollection
- setFeatureCompatibilityVersion
- setIndexCommitQuorum
- setParameter
- setDefaultRWConcern
- shutdown
- Diagnostic Commands
- availableQueryOptions
- buildInfo
- collStats
- connPoolStats
- connectionStatus
- cursorInfo
- dataSize
- dbHash
- dbStats
- diagLogging
- driverOIDTest
- explain
- features
- getCmdLineOpts
- getLog
- hostInfo
- isSelf
- listCommands
- lockInfo
- netstat
- ping
- profile
- serverStatus
- shardConnPoolStats
- top
- validate
- whatsmyuri
- 免费监控命令
- getFreeMonitoringStatus
- setFreeMonitoring
- 数据库命令
- logApplicationMessage
- 管理命令
- 聚合命令
- 诊断命令
- 地理空间命令
- 查询和写操作命令
- 复制命令
- 角色管理命令
- 会话命令
- 分片命令
- 用户管理命令
- mongo Shell 方法
- Collection Methods
- db.collection.aggregate()
- db.collection.bulkWrite()
- db.collection.copyTo()
- db.collection.count()
- db.collection.countDocuments()
- db.collection.estimatedDocumentCount()
- db.collection.createIndex()
- db.collection.createIndexes()
- db.collection.dataSize()
- db.collection.deleteOne()
- db.collection.deleteMany()
- db.collection.distinct()
- db.collection.drop()
- db.collection.dropIndex()
- db.collection.dropIndexes()
- db.collection.ensureIndex()
- db.collection.explain()
- db.collection.find()
- db.collection.findAndModify()
- db.collection.findOne()
- db.collection.findOneAndDelete()
- db.collection.findOneAndReplace()
- db.collection.findOneAndUpdate()
- db.collection.getIndexes()
- db.collection.getShardDistribution()
- db.collection.getShardVersion()
- db.collection.hideIndex()
- db.collection.insert()
- db.collection.insertOne()
- db.collection.insertMany()
- db.collection.isCapped()
- db.collection.latencyStats()
- db.collection.mapReduce()
- db.collection.reIndex()
- db.collection.remove()
- db.collection.renameCollection()
- db.collection.replaceOne()
- db.collection.save()
- db.collection.stats()
- db.collection.storageSize()
- db.collection.totalIndexSize()
- db.collection.totalSize()
- db.collection.unhideIndex()
- db.collection.update()
- db.collection.updateOne()
- db.collection.updateMany()
- db.collection.watch()
- db.collection.validate()
- Cursor Methods
- cursor.addOption()
- cursor.allowDiskUse()
- cursor.allowPartialResults()
- cursor.batchSize()
- cursor.close()
- cursor.isClosed()
- cursor.collation()
- cursor.comment()
- cursor.count()
- cursor.explain()
- cursor.forEach()
- cursor.hasNext()
- cursor.hint()
- cursor.isExhausted()
- cursor.itcount()
- cursor.limit()
- cursor.map()
- cursor.max()
- cursor.maxTimeMS()
- cursor.min()
- cursor.next()
- cursor.noCursorTimeout()
- cursor.objsLeftInBatch()
- cursor.pretty()
- cursor.readConcern()
- cursor.readPref()
- cursor.returnKey()
- cursor.showRecordId()
- cursor.size()
- cursor.skip()
- cursor.sort()
- cursor.tailable()
- cursor.toArray()
- Database Methods
- db.adminCommand()
- db.aggregate()
- db.cloneDatabase()
- db.commandHelp()
- db.copyDatabase()
- db.createCollection()
- db.createView()
- db.currentOp()
- db.dropDatabase()
- db.eval()
- db.fsyncLock()
- db.fsyncUnlock()
- db.getCollection()
- db.getCollectionInfos()
- db.getCollectionNames()
- db.getLastError()
- db.getLastErrorObj()
- db.getLogComponents()
- db.getMongo()
- db.getName()
- db.getProfilingLevel()
- db.getProfilingStatus()
- db.getReplicationInfo()
- db.getSiblingDB()
- db.help()
- db.hostInfo()
- db.isMaster()
- db.killOp()
- db.listCommands()
- db.logout()
- db.printCollectionStats()
- db.printReplicationInfo()
- db.printShardingStatus()
- db.printSlaveReplicationInfo()
- db.resetError()
- db.runCommand()
- db.serverBuildInfo()
- db.serverCmdLineOpts()
- db.serverStatus()
- db.setLogLevel()
- db.setProfilingLevel()
- db.shutdownServer()
- db.stats()
- db.version()
- db.watch()
- Query Plan Cache Methods
- db.collection.getPlanCache()
- PlanCache.clear()
- PlanCache.clearPlansByQuery()
- PlanCache.help()
- PlanCache.list()
- Bulk Operation Methods
- db.collection.initializeOrderedBulkOp()
- db.collection.initializeUnorderedBulkOp()
- Bulk()
- Bulk.execute()
- Bulk.find()
- Bulk.find.arrayFilters()
- Bulk.find.collation()
- Bulk.find.hint()
- Bulk.find.remove()
- Bulk.find.removeOne()
- Bulk.find.replaceOne()
- Bulk.find.updateOne()
- Bulk.find.update()
- Bulk.find.upsert()
- Bulk.getOperations()
- Bulk.insert()
- Bulk.tojson()
- Bulk.toString()
- User Management Methods
- db.auth()
- db.changeUserPassword()
- db.createUser()
- db.dropUser()
- db.dropAllUsers()
- db.getUser()
- db.getUsers()
- db.grantRolesToUser()
- db.removeUser()
- db.revokeRolesFromUser()
- db.updateUser()
- passwordPrompt()
- Role Management Methods
- db.createRole()
- db.dropRole()
- db.dropAllRoles()
- db.getRole()
- db.getRoles()
- db.grantPrivilegesToRole()
- db.revokePrivilegesFromRole()
- db.grantRolesToRole()
- db.revokeRolesFromRole()
- db.updateRole()
- Replication Methods
- rs.add()
- rs.addArb()
- rs.conf()
- rs.freeze()
- rs.help()
- rs.initiate()
- rs.printReplicationInfo()
- rs.printSlaveReplicationInfo()
- rs.reconfig()
- rs.remove()
- rs.status()
- rs.stepDown()
- rs.syncFrom()
- Sharding Methods
- sh.addShard()
- sh.addShardTag()
- sh.addShardToZone()
- sh.addTagRange()
- sh.balancerCollectionStatus()
- sh.disableBalancing()
- sh.enableBalancing()
- sh.disableAutoSplit
- sh.enableAutoSplit
- sh.enableSharding()
- sh.getBalancerHost()
- sh.getBalancerState()
- sh.removeTagRange()
- sh.removeRangeFromZone()
- sh.help()
- sh.isBalancerRunning()
- sh.moveChunk()
- sh.removeShardTag()
- sh.removeShardFromZone()
- sh.setBalancerState()
- sh.shardCollection()
- sh.splitAt()
- sh.splitFind()
- sh.startBalancer()
- sh.status()
- sh.stopBalancer()
- sh.waitForBalancer()
- sh.waitForBalancerOff()
- sh.waitForPingChange()
- sh.updateZoneKeyRange()
- convertShardKeyToHashed
- Free Monitoring Methods
- db.disableFreeMonitoring()
- db.enableFreeMonitoring()
- db.getFreeMonitoringStatus
- Object Constructors and Methods
- BulkWriteResult()
- Date()
- ObjectId
- ObjectId.getTimestamp()
- ObjectId.toString()
- ObjectId.valueOf()
- UUID()
- WriteResult()
- WriteResult.hasWriteError()
- WriteResult.hasWriteConcernError()
- Connection Methods
- connect()
- Mongo()
- Mongo.getDB()
- Mongo.getReadPrefMode()
- Mongo.getReadPrefTagSet()
- Mongo.isCausalConsistency()
- Mongo.setCausalConsistency()
- Mongo.setReadPref()
- Mongo.startSession()
- Mongo.watch()
- Session
- SessionOptions
- Native Methods
- cat()
- cd()
- copyDbpath()
- getHostName()
- getMemInfo()
- hostname()
- isInteractive()
- listFiles()
- load()
- ls()
- md5sumFile()
- mkdir()
- pwd()
- quit()
- removeFile()
- resetDbpath()
- sleep()
- setVerboseShell()
- version()
- _isWindows()
- _rand()
- Client-Side Field Level Encryption Methods
- getKeyVault()
- KeyVault.createKey()
- KeyVault.deleteKey()
- KeyVault.getKey()
- KeyVault.getKeys()
- KeyVault.addKeyAlternateName()
- KeyVault.removeKeyAlternateName()
- KeyVault.getKeyByAltName()
- getClientEncryption()
- ClientEncryption.encrypt()
- ClientEncryption.decrypt()
- mongo Shell 方法
- MongoDB Package Components
- Configuration File Options
- MongoDB Server Parameters
- MongoDB Limits and Thresholds
- Explain Results
- System Collections
- 连接字符串URI格式
- 排序
- MongoDB的Wire协议
- 日志消息
- Exit Codes and Statuses
- 词汇表
- 默认的MongoDB端口
- Default MongoDB Read Concerns/Write Concerns
- 服务器会话
- Configuration File Options
- 默认的MongoDB读/写关注
- 退出代码和状态
- MongoDB Limits and Thresholds
- Operators
- 更新说明
- Release Notes for MongoDB 4.4
- Release Notes for MongoDB 4.2
- Release Notes for MongoDB 4.0
- Release Notes for MongoDB 3.6
- Release Notes for MongoDB 3.4
- Release Notes for MongoDB 3.2
- Release Notes for MongoDB 3.0
- Release Notes for MongoDB 2.6
- Release Notes for MongoDB 2.4
- Release Notes for MongoDB 2.2
- Release Notes for MongoDB 2.0
- Release Notes for MongoDB 1.8
- Release Notes for MongoDB 1.6
- Release Notes for MongoDB 1.4
- Release Notes for MongoDB 1.2.x
- MongoDB Versioning
- 技术支持
- 开始使用MongoDB开发
- 联系我们
- 更多资料
- 本书使用 GitBook 发布
Map-Reduce转换到聚合管道
从4.4版本开始,MongoDB添加了$accumulator
和$function
aggregation运算符。这些运算符为用户提供了定义自定义聚合表达式的能力。使用这些操作,可以大致重写map-reduce表达式,如下表所示。
注意
可以使用聚合管道操作符(如$group、$merge等)重写各种map-reduce表达式,而不需要自定义函数。
例如,请参见map-reduce示例。
Map-Reduce到聚合管道转换表
这张表只是粗略的翻译。例如,该表显示了使用$project
的mapFunction
的近似转换。
- 然而,mapFunction逻辑可能需要额外的阶段,例如,如果逻辑包括对数组的迭代:
function() {
this.items.forEach(function(item){ emit(item.sku, 1); });
}
然后,聚合管道包括一个$unwind
和一个$project
:
{ $unwind: "$items "},
{ $project: { emits: { key: { "$items.sku" }, value: 1 } } },
$project
中的emit
字段可以被命名为其他名称。为了进行可视化比较,选择了字段名称emit。
Map-Reduce | Aggregation Pipeline |
---|---|
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: <collection> } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: { db: <db>, coll: <collection> } } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection>}, on: “_id” whenMatched: “replace”, whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection> }, on: “_id” whenMatched: [ { $project: { value: { $function: { body: <reduceFunction>, args: [ “$_id”, [ “$value”, “$$new.value” ] ], lang: “js” } } } } ] whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } } ] ) |
例子
可以使用聚合管道操作符(如$group
、$merge
等)重写各种map-reduce表达式,而不需要自定义函数。但是,为了说明目的,下面的例子提供了两种选择。
示例1
通过cust_id
对订单集合组执行以下map-reduce
操作,并计算每个cust_id
的价格总和:
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:
db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])
备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator
定义自定义函数:
db.orders.aggregate( [
{ $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
{ $group: { // equivalent to the reduce function
_id: "$emit.key",
valuesPrices: { $accumulator: {
init: function() { return 0; },
initArgs: [],
accumulate: function(state, value) { return state + value; },
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) { return state1 + state2; },
lang: "js"
} }
} },
{ $out: "agg_alternative_2" }
] )
首先,
$project
阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:key
包含cust_id
文档的值value
包含price
文档的值
{ "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
{ "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
{ "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
{ "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
{ "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
{ "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
{ "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
{ "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
- 然后,
$group
使用$accumulator
操作符来添加发出的值:
{ "_id" : "Don Quis", "valuesPrices" : 155 }
{ "_id" : "Cam Elot", "valuesPrices" : 60 }
{ "_id" : "Ant O. Knee", "valuesPrices" : 95 }
{ "_id" : "Busby Bee", "valuesPrices" : 125 }
- 最后,
$out
将输出写入集合agg_alternative_2
。或者,您可以使用$merge
而不是$out
。
示例2
以下字段对orders
集合组的map-reduce操作,item.sku
并计算每个sku的订单数量和总订购量。然后,该操作将为每个sku值计算每个订单的平均数量,并将结果合并到输出集合中。
var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = { count: 1, qty: this.items[idx].qty };
emit(key, value);
}
};
var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example2" },
query: { ord_date: { $gte: new Date("2020-03-01") } },
finalize: finalizeFunction2
}
);
备选方案1:(推荐)您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:
db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
备选方案2:(仅为说明目的)下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator
定义自定义函数:
db.orders.aggregate( [
{ $match: { ord_date: {$gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
{ $group: {
_id: "$emit.key",
value: { $accumulator: {
init: function() { return { count: 0, qty: 0 }; },
initArgs: [],
accumulate: function(state, value) {
state.count += value.count;
state.qty += value.qty;
return state;
},
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) {
return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
},
finalize: function(state) {
state.avg = state.qty / state.count;
return state;
},
lang: "js"}
}
} },
{ $merge: {
into: "agg_alternative_4",
on: "_id",
whenMatched: "replace",
whenNotMatched: "insert"
} }
] )
$match
阶段只选择那些ord_date大于或等于new Date("2020-03-01")的文档。$unwinds
阶段按items数组字段分解文档,为每个数组元素输出一个文档。例如:
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
...
$project
阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:key
包含items.sku
值value
包含具有qty
值和count
值的文档
{ "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
{ "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
...
$group
使用$accumulator
操作符来添加发出的计数和数量,并计算avg字段:
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
- 最后,
$merge
将输出写入集合agg_alternative_4
。如果现有文档具有与新结果相同的键_id,则操作将覆盖现有文档。如果没有具有相同密钥的现有文档,操作将插入该文档。
也可以看看
聚合命令比较
译者:李冠飞
校对:
参见
原文 - Map-Reduce to Aggregation Pipeline
Copyright © 上海锦木信息技术有限公司 all right reserved,powered by Gitbook文件修订时间: 2020-12-18 11:34:57