Source
package com.donson.uitls;import com.mongodb.MongoClient;import com.mongodb.MongoClientURI;import com.mongodb.client.FindIterable;import com.mongodb.client.MongoCollection;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.bson.Document;import java.util.Properties;public class MongoSource extends RichSourceFunction<String> { private MongoClient mongoClient; private Properties properties; private MongoCollection<Document> collection; public MongoSource(Properties properties) { this.properties = properties; } @Override public void open(Configuration parameters) throws Exception { /*链接数据库*/ mongoClient = new MongoClient(new MongoClientURI(properties.getProperty("uri"))); collection = mongoClient.getDatabase(properties.getProperty("dbName")).getCollection(properties.getProperty("collectionName")); } @Override public void run(SourceContext<String> ctx) throws Exception { /*读取数据*/ FindIterable<Document> documents = collection.find().batchSize(10); for(Document document: documents) { ctx.collect(document.toJson()); } } @Override public void cancel() { if (this.mongoClient !=null) { this.mongoClient.close(); } }}
Sink
public static class InsertData extends RichSinkFunction<JSONObject> { private MongoClient mongoClient; private Properties properties; private MongoCollection<Document> collection; public InsertData(Properties properties) { this.properties = properties; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); mongoClient = new MongoClient(new MongoClientURI(properties.getProperty("uri"))); collection = mongoClient.getDatabase(properties.getProperty("dbName")).getCollection(properties.getProperty("collectionName")); } @Override public void invoke(JSONObject value, Context context) throws Exception { Document doc = new Document(); doc.append("author_id", value.getString("author_id")); doc.append("name", value.getString("name")); // collection.insertOne(doc); BasicDBObject searchDoc = new BasicDBObject().append("author_id", doc.getString("author_id")); BasicDBObject newDoc = new BasicDBObject("$set", doc); collection.findOneAndUpdate(searchDoc, newDoc, new FindOneAndUpdateOptions().upsert(true)); } @Override public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } } }