使用条件限制
- 只用于 replica sets 和 sharded clusters ,单节点因为没有oplog故不支持。
- 复制协议必须是pv1 存储引擎必须是 WiredTiger
- MongoDB 3.6 版本只实现了集合粒度,4.0版本支持集群及库级别的ChangeStream
- ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知
- 要想在集合上创建ChangeStream游标用户必须对集合具有读权限
- 对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息,operationType=invalidate表示无效或非法操作
使用方式
原生
public static void main(String[] args) {
String databaseName = "demo";
String collectionName = "demo";
MongoClient mongoClient = MongoClients.create("mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1");
MongoDatabase db = mongoClient.getDatabase(databaseName);
MongoCollection<Document> coll = db.getCollection(collectionName);
// insert事件,update事件,delete事件,replace事件,invalidate事件
List<Bson> pipeline = java.util.Collections.singletonList(Aggregates.match(Filters.or(Document.parse("{}"),
Filters.in("operationType", Arrays.asList("insert", "update","replace", "delete")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = coll.watch(pipeline)
//更新返回全部数据
.fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> next = cursor.next();
String Operation = next.getOperationType().getValue();
System.out.println(next.getDocumentKey().getObjectId("_id").getValue().toHexString());// insert,replace、delete
System.out.println("Operation : "+Operation);
System.out.println("DocumentKey : "+next.getDocumentKey());
System.out.println("FullDocument : "+next.getFullDocument());// delete or update=null
System.out.println("UpdateDescription : "+next.getUpdateDescription());// update: removedFields=[], updatedFields={"f":31.0}
}
}
Springboot
```java spring: data: mongodb: uri: mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/demo?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1 /**- */ package com.qizai.modules.mongodb.config;
import java.util.concurrent.Executor; import java.util.concurrent.Executors;
import org.bson.Document; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.data.mongodb.core.query.Criteria;
import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument;
import lombok.Data; import lombok.extern.slf4j.Slf4j;
@Slf4j @Configuration public class MongoListenerConfig {
@Autowired
private MongoProperties mongoProperties;
@Bean
MongoClient mongoClient() {
//解决spring boot 连接replicaSet问题
return MongoClients.create(mongoProperties.getUri());
}
@Bean // 启动开始监听
MessageListenerContainer messageListenerContainer(MongoTemplate template) {
Executor executor = Executors.newSingleThreadExecutor();
return new DefaultMessageListenerContainer(template, executor) {
@Override
public boolean isAutoStartup() {
return true;
}
};
}
@Autowired
private MessageListenerContainer messageListenerContainer;
@EventListener(ApplicationStartedEvent.class) // 应用启动完后开始注册监听请求
public void subscript() {
DocMessageListener docMessageListener = new DocMessageListener();
ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(docMessageListener)
// 指定表
.collection("demo")
.filter(Aggregation.newAggregation(
Aggregation.match(Criteria.where("operationType").in("insert", "update", "replace", "delete"))))
// 更新返回全部数据
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();
messageListenerContainer.register(request, Document.class);
}
// 集合监听处理
class DocMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {
@Override
public void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {
Document obj = message.getBody();
// MongoDataAutoConfiguration
// TODO
log.info("MessageListener : {}", obj);
}
}
}
```