- MongoDB官方文档中文版
- MongoDB用户手册
- MongoDB简介
- 安装 MongoDB
- 安装MongoDB社区版
- 安装MongoDB企业版
- 将社区版MongoDB升级到企业版MongoDB
- 验证MongoDB软件包的完整性
- Mongo Shell
- 增删改查操作
- 聚合
- 数据模式
- 数据建模介绍
- 模式验证
- Data Modeling Concepts
- Data Model Examples and Patterns
- Model Relationships Between Documents
- Model One-to-One Relationships with Embedded Documents
- Model One-to-Many Relationships with Embedded Documents
- Model One-to-Many Relationships with Document References
- Model Tree Structures
- Model Tree Structures with Parent References
- Model Tree Structures with Child References
- Model Tree Structures with an Array of Ancestors
- Model Tree Structures with Materialized Paths
- Model Tree Structures with Nested Sets
- Model Specific Application Contexts
- Model Data for Atomic Operations
- Model Data to Support Keyword Search
- Model Data for Schema Versioning
- Model Monetary Data
- Model Time Data
- Model Computed Data
- Data Model Reference
- 事务
- 索引
- Single Field Indexes
- Compound Indexes
- Multikey Indexes
- Text Indexes
- Wildcard Indexes
- 2dsphere Indexes
- 2d Indexes
- geoHaystack Indexes
- Hashed Indexes
- 索引特性
- Index Builds on Populated Collections
- 索引交集
- Manage Indexes
- 衡量索引使用
- Indexing Strategies
- Indexing Reference
- 2d索引
- 2dsphere索引
- 复合索引
- geoHaystack索引
- Hashed 索引
- 在填充的集合上建立索引
- 索引参考
- 索引策略
- 管理索引
- 多键索引
- 单字段索引
- 文本索引
- 通配符索引
- 安全
- 安全检查列表
- Enable Access Control
- 身份验证
- Users
- Add Users
- Authentication Mechanisms
- SCRAM
- x.509
- Enterprise Authentication Mechanisms
- Kerberos Authentication
- LDAP Proxy Authentication
- LDAP Authorization
- Internal/Membership Authentication
- Deploy Replica Set With Keyfile Authentication
- Update Replica Set to Keyfile Authentication
- Update Replica Set to Keyfile Authentication (No Downtime)
- Rotate Keys for Replica Sets
- Deploy Sharded Cluster with Keyfile Authentication
- Update Sharded Cluster to Keyfile Authentication
- Update Sharded Cluster to Keyfile Authentication (No Downtime)
- Rotate Keys for Sharded Clusters
- Use x.509 Certificate for Membership Authentication
- Upgrade from Keyfile Authentication to x.509 Authentication
- Rolling Update of x.509 Cluster Certificates that Contain New DN
- Role-Based Access Control
- TLS/SSL (Transport Encryption)
- Encryption at Rest
- Client-Side Field Level Encryption
- 审计
- Network and Configuration Hardening
- Implement Field Level Redaction
- Security Reference
- Create a Vulnerability Report
- Appendix
- 附录
- 启用访问控制
- 网络和配置强化
- 安全参考
- 改变流
- 复制集
- 复制集成员
- 副本集日志
- Replica Set Data Synchronization
- 副本集部署架构
- Replica Set High Availability
- Replica Set Read and Write Semantics
- Replica Set Deployment Tutorials
- Member Configuration Tutorials
- Replica Set Maintenance Tutorials
- Change the Size of the Oplog
- Perform Maintenance on Replica Set Members
- Force a Member to Become Primary
- Resync a Member of a Replica Set
- Configure Replica Set Tag Sets
- Reconfigure a Replica Set with Unavailable Members
- Manage Chained Replication
- Change Hostnames in a Replica Set
- Configure a Secondary’s Sync Target
- Replication Reference
- 副本集数据同步
- 分片
- 分片集群组成]
- 片键
- 哈希分片
- 范围分片
- 部署分片集群
- 区域
- Data Partitioning with Chunks
- Balancer
- Administration
- Config Server Administration
- Replace a Config Server
- View Cluster Configuration
- Restart a Sharded Cluster
- Migrate a Sharded Cluster to Different Hardware
- Add Shards to a Cluster
- Remove Shards from an Existing Sharded Cluster
- Clear jumbo Flag
- Back Up Cluster Metadata
- Convert Sharded Cluster to Replica Set
- Convert a Replica Set to a Sharded Cluster
- Convert a Shard Standalone to a Shard Replica Set
- Sharding Reference
- 分片键
- Zone
- 管理权限
- 产品说明
- Operations Checklist
- Development Checklist
- Performance
- 配置和维护
- Data Center Awareness
- MongoDB Backup Methods
- Back Up and Restore with Filesystem Snapshots
- Back Up and Restore with MongoDB Tools
- Restore a Replica Set from MongoDB Backups
- Backup and Restore Sharded Clusters
- Back Up a Sharded Cluster with File System Snapshots
- Back Up a Sharded Cluster with Database Dumps
- Schedule Backup Window for Sharded Clusters
- Restore a Sharded Cluster
- Recover a Standalone after an Unexpected Shutdown
- Monitoring for MongoDB
- 开发检查表
- MogoDB 备份方法
- 操作检查表
- MongoDB性能
- 存储
- 常见问题
- 参考
- Operators
- Query and Projection Operators
- Comparison Query Operators
- Logical Query Operators
- Element Query Operators
- Evaluation Query Operators
- Geospatial Query Operators
- Array Query Operators
- Bitwise Query Operators
- $comment
- 查询与映射运算符
- 更新运算符
- Field Update Operators
- Array Update Operators
- Bitwise Update Operator
- Aggregation Pipeline Stages
- $addFields (aggregation)
- $bucket (aggregation)
- $bucketAuto (aggregation)
- $collStats (aggregation)
- $count (aggregation)
- $currentOp (aggregation)
- $facet (aggregation)
- $geoNear (aggregation)
- $graphLookup (aggregation)
- $group (aggregation)
- $indexStats (aggregation)
- $limit (aggregation)
- $listLocalSessions
- $listSessions
- $lookup (aggregation)
- $match (aggregation)
- $merge (aggregation)
- $out (aggregation)
- $planCacheStats
- $project (aggregation)
- $redact (aggregation)
- $replaceRoot (aggregation)
- $replaceWith (aggregation)
- $sample (aggregation)
- $set (aggregation)
- $skip (aggregation)
- $sort (aggregation)
- $sortByCount (aggregation)
- $unionWith (aggregation)
- $unset (aggregation)
- $unwind (aggregation)
- Aggregation Pipeline Operators
- $sin (aggregation)
- $abs (aggregation)
- $slice (aggregation)
- $accumulator (aggregation)
- $split (aggregation)
- $acos (aggregation)
- $sqrt (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $allElementsTrue (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $sum (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $tan (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $binarySize (aggregation)
- $bsonSize (aggregation)
- $ceil (aggregation)
- $toInt (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateToParts (aggregation)
- $type (aggregation)
- $dateFromString (aggregation)
- $week (aggregation)
- $dateToString (aggregation)
- $year (aggregation)
- $dayOfMonth (aggregation)
- $zip (aggregation)
- $dayOfWeek (aggregation)
- $dayOfYear (aggregation)
- $degreesToRadians (aggregation)
- $divide (aggregation)
- $eq (aggregation)
- $exp (aggregation)
- $filter (aggregation)
- $first (aggregation accumulator)
- $first (aggregation)
- $floor (aggregation)
- $function (aggregation)
- $gt (aggregation)
- $gte (aggregation)
- $hour (aggregation)
- $ifNull (aggregation)
- $in (aggregation)
- $indexOfArray (aggregation)
- $indexOfBytes (aggregation)
- $indexOfCP (aggregation)
- $isArray (aggregation)
- $isNumber (aggregation)
- $isoDayOfWeek (aggregation)
- $isoWeek (aggregation)
- $isoWeekYear (aggregation)
- $last (aggregation accumulator)
- $last (aggregation)
- $let (aggregation)
- $literal (aggregation)
- $ln (aggregation)
- $log (aggregation)
- $log10 (aggregation)
- $lt (aggregation)
- $lte (aggregation)
- $trim (aggregation)
- $map (aggregation)
- $max (aggregation)
- $mergeObjects (aggregation)
- $meta
- $min (aggregation)
- $millisecond (aggregation)
- $minute (aggregation)
- $mod (aggregation)
- $month (aggregation)
- $multiply (aggregation)
- $ne (aggregation)
- $not (aggregation)
- $objectToArray (aggregation)
- $or (aggregation)
- $pow (aggregation)
- $push (aggregation)
- $radiansToDegrees (aggregation)
- $range (aggregation)
- $reduce (aggregation)
- $regexFind (aggregation)
- $regexFindAll (aggregation)
- $regexMatch (aggregation)
- $replaceOne (aggregation)
- $replaceAll (aggregation)
- $reverseArray (aggregation)
- $round (aggregation)
- $rtrim (aggregation)
- $second (aggregation)
- $setDifference (aggregation)
- $setEquals (aggregation)
- $setIntersection (aggregation)
- $setIsSubset (aggregation)
- $setUnion (aggregation)
- $size (aggregation)
- $slice (aggregation)
- $split (aggregation)
- $sqrt (aggregation)
- $stdDevPop (aggregation)
- $stdDevSamp (aggregation)
- $strcasecmp (aggregation)
- $strLenBytes (aggregation)
- $strLenCP (aggregation)
- $substr (aggregation)
- $substrBytes (aggregation)
- $substrCP (aggregation)
- $subtract (aggregation)
- $sum (aggregation)
- $switch (aggregation)
- $tan (aggregation)
- $toBool (aggregation)
- $toDate (aggregation)
- $toDecimal (aggregation)
- $toDouble(aggregation)
- $toInt (aggregation)
- $toLong (aggregation)
- $toObjectId (aggregation)
- $toString (aggregation)
- $toLower (aggregation)
- $toUpper (aggregation)
- $trim (aggregation)
- $trunc (aggregation)
- $type (aggregation)
- $week (aggregation)
- $year (aggregation)
- $zip (aggregation)
- 查询修饰符
- $comment
- $explain
- $hint
- $max
- $maxTimeMS
- $min
- $orderby
- $query
- $returnKey
- $showDiskLoc
- $natural
- 聚合管道操作符
- $abs (aggregation)
- $acos (aggregation)
- $acosh (aggregation)
- $add (aggregation)
- $addToSet (aggregation)
- $and (aggregation)
- $anyElementTrue (aggregation)
- $arrayElemAt (aggregation)
- $arrayToObject (aggregation)
- $asin (aggregation)
- $asinh (aggregation)
- $atan (aggregation)
- $atan2 (aggregation)
- $atanh (aggregation)
- $avg (aggregation)
- $ceil (aggregation)
- $cmp (aggregation)
- $concat (aggregation)
- $concatArrays (aggregation)
- $cond (aggregation)
- $convert (aggregation)
- $cos (aggregation)
- $dateFromParts (aggregation)
- $dateFromString (aggregation)
- $dateToParts (aggregation)
- $dateToString (aggregation)
- $literal (aggregation)
- 聚合管道阶段
- 数据库命令
- Aggregation Commands
- aggregate
- count
- distinct
- mapReduce
- Geospatial Commands
- geoSearch
- Query and Write Operation Commands
- delete
- find
- findAndModify
- getLastError
- getMore
- insert
- resetError
- update
- 查询计划缓存命令
- planCacheClear
- planCacheClearFilters
- planCacheListFilters
- planCacheSetFilter
- 认证命令
- authenticate
- getnonce
- logout
- User Management Commands
- createUser
- dropAllUsersFromDatabase
- dropUser
- grantRolesToUser
- revokeRolesFromUser
- updateUser
- usersInfo
- Role Management Commands
- createRole
- dropRole
- dropAllRolesFromDatabase
- grantPrivilegesToRole
- grantRolesToRole
- invalidateUserCache
- revokePrivilegesFromRole
- revokeRolesFromRole
- rolesInfo
- updateRole
- Replication Commands
- applyOps
- isMaster
- replSetAbortPrimaryCatchUp
- replSetFreeze
- replSetGetConfig
- replSetGetStatus
- replSetInitiate
- replSetMaintenance
- replSetReconfig
- replSetResizeOplog
- replSetStepDown
- replSetSyncFrom
- Sharding Commands
- addShard
- addShardToZone
- balancerCollectionStatus
- balancerStart
- balancerStatus
- balancerStop
- checkShardingIndex
- clearJumboFlag
- cleanupOrphaned
- enableSharding
- flushRouterConfig
- getShardMap
- getShardVersion
- isdbgrid
- listShards
- medianKey
- moveChunk
- movePrimary
- mergeChunks
- refineCollectionShardKey
- removeShard
- removeShardFromZone
- setShardVersion
- shardCollection
- shardingState
- split
- splitChunk
- splitVector
- unsetSharding
- updateZoneKeyRange
- Sessions Commands
- abortTransaction
- commitTransaction
- endSessions
- killAllSessions
- killAllSessionsByPattern
- killSessions
- refreshSessions
- startSession
- Administration Commands
- cloneCollectionAsCapped
- collMod
- compact
- connPoolSync
- convertToCapped
- create
- createIndexes
- currentOp
- drop
- dropDatabase
- dropConnections
- dropIndexes
- filemd5
- fsync
- fsyncUnlock
- getDefaultRWConcern
- getParameter
- killCursors
- killOp
- listCollections
- listDatabases
- listIndexes
- logRotate
- reIndex
- renameCollection
- setFeatureCompatibilityVersion
- setIndexCommitQuorum
- setParameter
- setDefaultRWConcern
- shutdown
- Diagnostic Commands
- availableQueryOptions
- buildInfo
- collStats
- connPoolStats
- connectionStatus
- cursorInfo
- dataSize
- dbHash
- dbStats
- diagLogging
- driverOIDTest
- explain
- features
- getCmdLineOpts
- getLog
- hostInfo
- isSelf
- listCommands
- lockInfo
- netstat
- ping
- profile
- serverStatus
- shardConnPoolStats
- top
- validate
- whatsmyuri
- 免费监控命令
- getFreeMonitoringStatus
- setFreeMonitoring
- 数据库命令
- logApplicationMessage
- 管理命令
- 聚合命令
- 诊断命令
- 地理空间命令
- 查询和写操作命令
- 复制命令
- 角色管理命令
- 会话命令
- 分片命令
- 用户管理命令
- mongo Shell 方法
- Collection Methods
- db.collection.aggregate()
- db.collection.bulkWrite()
- db.collection.copyTo()
- db.collection.count()
- db.collection.countDocuments()
- db.collection.estimatedDocumentCount()
- db.collection.createIndex()
- db.collection.createIndexes()
- db.collection.dataSize()
- db.collection.deleteOne()
- db.collection.deleteMany()
- db.collection.distinct()
- db.collection.drop()
- db.collection.dropIndex()
- db.collection.dropIndexes()
- db.collection.ensureIndex()
- db.collection.explain()
- db.collection.find()
- db.collection.findAndModify()
- db.collection.findOne()
- db.collection.findOneAndDelete()
- db.collection.findOneAndReplace()
- db.collection.findOneAndUpdate()
- db.collection.getIndexes()
- db.collection.getShardDistribution()
- db.collection.getShardVersion()
- db.collection.hideIndex()
- db.collection.insert()
- db.collection.insertOne()
- db.collection.insertMany()
- db.collection.isCapped()
- db.collection.latencyStats()
- db.collection.mapReduce()
- db.collection.reIndex()
- db.collection.remove()
- db.collection.renameCollection()
- db.collection.replaceOne()
- db.collection.save()
- db.collection.stats()
- db.collection.storageSize()
- db.collection.totalIndexSize()
- db.collection.totalSize()
- db.collection.unhideIndex()
- db.collection.update()
- db.collection.updateOne()
- db.collection.updateMany()
- db.collection.watch()
- db.collection.validate()
- Cursor Methods
- cursor.addOption()
- cursor.allowDiskUse()
- cursor.allowPartialResults()
- cursor.batchSize()
- cursor.close()
- cursor.isClosed()
- cursor.collation()
- cursor.comment()
- cursor.count()
- cursor.explain()
- cursor.forEach()
- cursor.hasNext()
- cursor.hint()
- cursor.isExhausted()
- cursor.itcount()
- cursor.limit()
- cursor.map()
- cursor.max()
- cursor.maxTimeMS()
- cursor.min()
- cursor.next()
- cursor.noCursorTimeout()
- cursor.objsLeftInBatch()
- cursor.pretty()
- cursor.readConcern()
- cursor.readPref()
- cursor.returnKey()
- cursor.showRecordId()
- cursor.size()
- cursor.skip()
- cursor.sort()
- cursor.tailable()
- cursor.toArray()
- Database Methods
- db.adminCommand()
- db.aggregate()
- db.cloneDatabase()
- db.commandHelp()
- db.copyDatabase()
- db.createCollection()
- db.createView()
- db.currentOp()
- db.dropDatabase()
- db.eval()
- db.fsyncLock()
- db.fsyncUnlock()
- db.getCollection()
- db.getCollectionInfos()
- db.getCollectionNames()
- db.getLastError()
- db.getLastErrorObj()
- db.getLogComponents()
- db.getMongo()
- db.getName()
- db.getProfilingLevel()
- db.getProfilingStatus()
- db.getReplicationInfo()
- db.getSiblingDB()
- db.help()
- db.hostInfo()
- db.isMaster()
- db.killOp()
- db.listCommands()
- db.logout()
- db.printCollectionStats()
- db.printReplicationInfo()
- db.printShardingStatus()
- db.printSlaveReplicationInfo()
- db.resetError()
- db.runCommand()
- db.serverBuildInfo()
- db.serverCmdLineOpts()
- db.serverStatus()
- db.setLogLevel()
- db.setProfilingLevel()
- db.shutdownServer()
- db.stats()
- db.version()
- db.watch()
- Query Plan Cache Methods
- db.collection.getPlanCache()
- PlanCache.clear()
- PlanCache.clearPlansByQuery()
- PlanCache.help()
- PlanCache.list()
- Bulk Operation Methods
- db.collection.initializeOrderedBulkOp()
- db.collection.initializeUnorderedBulkOp()
- Bulk()
- Bulk.execute()
- Bulk.find()
- Bulk.find.arrayFilters()
- Bulk.find.collation()
- Bulk.find.hint()
- Bulk.find.remove()
- Bulk.find.removeOne()
- Bulk.find.replaceOne()
- Bulk.find.updateOne()
- Bulk.find.update()
- Bulk.find.upsert()
- Bulk.getOperations()
- Bulk.insert()
- Bulk.tojson()
- Bulk.toString()
- User Management Methods
- db.auth()
- db.changeUserPassword()
- db.createUser()
- db.dropUser()
- db.dropAllUsers()
- db.getUser()
- db.getUsers()
- db.grantRolesToUser()
- db.removeUser()
- db.revokeRolesFromUser()
- db.updateUser()
- passwordPrompt()
- Role Management Methods
- db.createRole()
- db.dropRole()
- db.dropAllRoles()
- db.getRole()
- db.getRoles()
- db.grantPrivilegesToRole()
- db.revokePrivilegesFromRole()
- db.grantRolesToRole()
- db.revokeRolesFromRole()
- db.updateRole()
- Replication Methods
- rs.add()
- rs.addArb()
- rs.conf()
- rs.freeze()
- rs.help()
- rs.initiate()
- rs.printReplicationInfo()
- rs.printSlaveReplicationInfo()
- rs.reconfig()
- rs.remove()
- rs.status()
- rs.stepDown()
- rs.syncFrom()
- Sharding Methods
- sh.addShard()
- sh.addShardTag()
- sh.addShardToZone()
- sh.addTagRange()
- sh.balancerCollectionStatus()
- sh.disableBalancing()
- sh.enableBalancing()
- sh.disableAutoSplit
- sh.enableAutoSplit
- sh.enableSharding()
- sh.getBalancerHost()
- sh.getBalancerState()
- sh.removeTagRange()
- sh.removeRangeFromZone()
- sh.help()
- sh.isBalancerRunning()
- sh.moveChunk()
- sh.removeShardTag()
- sh.removeShardFromZone()
- sh.setBalancerState()
- sh.shardCollection()
- sh.splitAt()
- sh.splitFind()
- sh.startBalancer()
- sh.status()
- sh.stopBalancer()
- sh.waitForBalancer()
- sh.waitForBalancerOff()
- sh.waitForPingChange()
- sh.updateZoneKeyRange()
- convertShardKeyToHashed
- Free Monitoring Methods
- db.disableFreeMonitoring()
- db.enableFreeMonitoring()
- db.getFreeMonitoringStatus
- Object Constructors and Methods
- BulkWriteResult()
- Date()
- ObjectId
- ObjectId.getTimestamp()
- ObjectId.toString()
- ObjectId.valueOf()
- UUID()
- WriteResult()
- WriteResult.hasWriteError()
- WriteResult.hasWriteConcernError()
- Connection Methods
- connect()
- Mongo()
- Mongo.getDB()
- Mongo.getReadPrefMode()
- Mongo.getReadPrefTagSet()
- Mongo.isCausalConsistency()
- Mongo.setCausalConsistency()
- Mongo.setReadPref()
- Mongo.startSession()
- Mongo.watch()
- Session
- SessionOptions
- Native Methods
- cat()
- cd()
- copyDbpath()
- getHostName()
- getMemInfo()
- hostname()
- isInteractive()
- listFiles()
- load()
- ls()
- md5sumFile()
- mkdir()
- pwd()
- quit()
- removeFile()
- resetDbpath()
- sleep()
- setVerboseShell()
- version()
- _isWindows()
- _rand()
- Client-Side Field Level Encryption Methods
- getKeyVault()
- KeyVault.createKey()
- KeyVault.deleteKey()
- KeyVault.getKey()
- KeyVault.getKeys()
- KeyVault.addKeyAlternateName()
- KeyVault.removeKeyAlternateName()
- KeyVault.getKeyByAltName()
- getClientEncryption()
- ClientEncryption.encrypt()
- ClientEncryption.decrypt()
- mongo Shell 方法
- MongoDB Package Components
- Configuration File Options
- MongoDB Server Parameters
- MongoDB Limits and Thresholds
- Explain Results
- System Collections
- 连接字符串URI格式
- 排序
- MongoDB的Wire协议
- 日志消息
- Exit Codes and Statuses
- 词汇表
- 默认的MongoDB端口
- Default MongoDB Read Concerns/Write Concerns
- 服务器会话
- Configuration File Options
- 默认的MongoDB读/写关注
- 退出代码和状态
- MongoDB Limits and Thresholds
- Operators
- 更新说明
- Release Notes for MongoDB 4.4
- Release Notes for MongoDB 4.2
- Release Notes for MongoDB 4.0
- Release Notes for MongoDB 3.6
- Release Notes for MongoDB 3.4
- Release Notes for MongoDB 3.2
- Release Notes for MongoDB 3.0
- Release Notes for MongoDB 2.6
- Release Notes for MongoDB 2.4
- Release Notes for MongoDB 2.2
- Release Notes for MongoDB 2.0
- Release Notes for MongoDB 1.8
- Release Notes for MongoDB 1.6
- Release Notes for MongoDB 1.4
- Release Notes for MongoDB 1.2.x
- MongoDB Versioning
- 技术支持
- 开始使用MongoDB开发
- 联系我们
- 更多资料
- 本书使用 GitBook 发布
执行增量 Map-Reduce
在本页面
Map-reduce 操作可以处理复杂的聚合任务。要执行 map-reduce 操作,MongoDB 提供MapReduce命令,并在mongo shell 中提供db.collection.mapReduce() wrapper 方法。
如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce 而不是每个 time 对整个数据集执行 map-reduce 操作。
执行增量 map-reduce:
在当前集合上运行 map-reduce job 并将结果输出到单独的集合。
如果有更多数据要进行 process,run 后续 map-reduce job:
query
参数指定仅匹配新文档的条件。out
参数,指定将新结果合并到现有输出集合中的reduce
操作。
请考虑以下 example,其中您在sessions
集合上安排 map-reduce 操作,以在每天结束时运行 run。
数据设置
sessions
集合包含 log 用户每天会话的文档,例如:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
当前集合的初始 Map-Reduce
运行第一个 map-reduce 操作如下:
- 定义 map function _将
userid
映射到包含字段userid
,total_time
,count
和avg_time
的 object:
var mapFunction = function() {
var key = this.userid;
var value = {
userid: this.userid,
total_time: this.length,
count: 1,
avg_time: 0
};
emit( key, value );
};
- 使用两个 arguments
key
和values
定义相应的 reduce function 以计算总 time 和计数。key
对应于userid
,values
是 array,其元素对应于映射到mapFunction
中userid
的各个 object。
var reduceFunction = function(key, values) {
var reducedObject = {
userid: key,
total_time: 0,
count:0,
avg_time:0
};
values.forEach( function(value) {
reducedObject.total_time += value.total_time;
reducedObject.count += value.count;
});
return reducedObject;
};
- 使用两个 arguments
key
和reducedValue
定义 finalize function。 function 修改reducedValue
文档以添加另一个字段average
并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0)
reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
return reducedValue;
};
- 使用
mapFunction
,reduceFunction
和finalizeFunction
函数在session
集合上执行 map-reduce。将结果输出到集合session_stat
。如果session_stat
集合已存在,则操作将替换内容:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
out: "session_stat",
finalize: finalizeFunction
}
)
- 查询
session_stats
集合以验证结果:
db.session_stats.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
后续增量 Map-Reduce
之后,随着sessions
集合的增长,您可以运行其他 map-reduce 操作。对于 example,将新文档添加到sessions
集合:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
最终,对usersessions
集合执行增量map-reduce ,但使用该query
字段仅选择新文档。将结果输出到collection session_stats
,但是reduce
将内容与增量map-reduce的结果进行比较:
db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);
查询session_stats
集合以验证结果:
db.session_stats.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
聚合替代
前提条件:将集合设置为原始状态:
db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])
使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:
db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
{ "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
{ "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
{ "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
- 该
$project
阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id
和value
。如果不需要镜像_id
andvalue
结构,则该阶段是可选的 。
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
该
$merge
阶段将结果输出到session_stats_agg
集合。如果现有文档_id
与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id
中session_stats_agg
,操作插入文档。查询
session_stats_agg
集合以验证结果:
db.session_stats_agg.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
- 新文档添加到
usersessions
集合中:
db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])
$match
在管道的开头添加一个阶段以指定日期过滤器:
db.usersessions.aggregate([
{ $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
- 查询
session_stats_agg
集合以验证结果:
db.session_stats_agg.find().sort( { _id: 1 } )
该操作返回以下文档:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
- 可选的。为了避免
$match
每次运行时都必须修改聚合管道的日期条件,可以在帮助函数中定义包装聚合:
updateSessionStats = function(startDate) {
db.usersessions.aggregate([
{ $match: { ts: { $gte: startDate } } },
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
]);
};
然后,要运行,您只需将开始日期传递给该updateSessionStats()
函数:
updateSessionStats(ISODate('2020-03-05 00:00:00'))
也可以看看
译者:李冠飞
校对:
参见
原文 - Perform Incremental Map-Reduce
Copyright © 上海锦木信息技术有限公司 all right reserved,powered by Gitbook文件修订时间: 2020-12-18 11:34:57