使用条件限制

  1. 只用于 replica sets 和 sharded clusters ,单节点因为没有oplog故不支持。
  2. 复制协议必须是pv1 存储引擎必须是 WiredTiger
  3. MongoDB 3.6 版本只实现了集合粒度,4.0版本支持集群及库级别的ChangeStream
  4. ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知
  5. 要想在集合上创建ChangeStream游标用户必须对集合具有读权限
  6. 对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息,operationType=invalidate表示无效或非法操作

    使用方式

    原生

    1. public static void main(String[] args) {
    2. String databaseName = "demo";
    3. String collectionName = "demo";
    4. MongoClient mongoClient = MongoClients.create("mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1");
    5. MongoDatabase db = mongoClient.getDatabase(databaseName);
    6. MongoCollection<Document> coll = db.getCollection(collectionName);
    7. // insert事件,update事件,delete事件,replace事件,invalidate事件
    8. List<Bson> pipeline = java.util.Collections.singletonList(Aggregates.match(Filters.or(Document.parse("{}"),
    9. Filters.in("operationType", Arrays.asList("insert", "update","replace", "delete")))));
    10. MongoCursor<ChangeStreamDocument<Document>> cursor = coll.watch(pipeline)
    11. //更新返回全部数据
    12. .fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
    13. while (cursor.hasNext()) {
    14. ChangeStreamDocument<Document> next = cursor.next();
    15. String Operation = next.getOperationType().getValue();
    16. System.out.println(next.getDocumentKey().getObjectId("_id").getValue().toHexString());// insert,replace、delete
    17. System.out.println("Operation : "+Operation);
    18. System.out.println("DocumentKey : "+next.getDocumentKey());
    19. System.out.println("FullDocument : "+next.getFullDocument());// delete or update=null
    20. System.out.println("UpdateDescription : "+next.getUpdateDescription());// update: removedFields=[], updatedFields={"f":31.0}
    21. }
    22. }

    Springboot

    ```java spring: data: mongodb: uri: mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/demo?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1 /**
    • */ package com.qizai.modules.mongodb.config;

import java.util.concurrent.Executor; import java.util.concurrent.Executors;

import org.bson.Document; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.data.mongodb.core.query.Criteria;

import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument;

import lombok.Data; import lombok.extern.slf4j.Slf4j;

@Slf4j @Configuration public class MongoListenerConfig {

  1. @Autowired
  2. private MongoProperties mongoProperties;
  3. @Bean
  4. MongoClient mongoClient() {
  5. //解决spring boot 连接replicaSet问题
  6. return MongoClients.create(mongoProperties.getUri());
  7. }
  8. @Bean // 启动开始监听
  9. MessageListenerContainer messageListenerContainer(MongoTemplate template) {
  10. Executor executor = Executors.newSingleThreadExecutor();
  11. return new DefaultMessageListenerContainer(template, executor) {
  12. @Override
  13. public boolean isAutoStartup() {
  14. return true;
  15. }
  16. };
  17. }
  18. @Autowired
  19. private MessageListenerContainer messageListenerContainer;
  20. @EventListener(ApplicationStartedEvent.class) // 应用启动完后开始注册监听请求
  21. public void subscript() {
  22. DocMessageListener docMessageListener = new DocMessageListener();
  23. ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(docMessageListener)
  24. // 指定表
  25. .collection("demo")
  26. .filter(Aggregation.newAggregation(
  27. Aggregation.match(Criteria.where("operationType").in("insert", "update", "replace", "delete"))))
  28. // 更新返回全部数据
  29. .fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();
  30. messageListenerContainer.register(request, Document.class);
  31. }
  32. // 集合监听处理
  33. class DocMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {
  34. @Override
  35. public void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {
  36. Document obj = message.getBody();
  37. // MongoDataAutoConfiguration
  38. // TODO
  39. log.info("MessageListener : {}", obj);
  40. }
  41. }

}

```