Source

  1. package com.donson.uitls;
  2. import com.mongodb.MongoClient;
  3. import com.mongodb.MongoClientURI;
  4. import com.mongodb.client.FindIterable;
  5. import com.mongodb.client.MongoCollection;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  8. import org.bson.Document;
  9. import java.util.Properties;
  10. public class MongoSource extends RichSourceFunction<String> {
  11. private MongoClient mongoClient;
  12. private Properties properties;
  13. private MongoCollection<Document> collection;
  14. public MongoSource(Properties properties) {
  15. this.properties = properties;
  16. }
  17. @Override
  18. public void open(Configuration parameters) throws Exception {
  19. /*链接数据库*/
  20. mongoClient = new MongoClient(new MongoClientURI(properties.getProperty("uri")));
  21. collection = mongoClient.getDatabase(properties.getProperty("dbName")).getCollection(properties.getProperty("collectionName"));
  22. }
  23. @Override
  24. public void run(SourceContext<String> ctx) throws Exception {
  25. /*读取数据*/
  26. FindIterable<Document> documents = collection.find().batchSize(10);
  27. for(Document document: documents) {
  28. ctx.collect(document.toJson());
  29. }
  30. }
  31. @Override
  32. public void cancel() {
  33. if (this.mongoClient !=null) {
  34. this.mongoClient.close();
  35. }
  36. }
  37. }

Sink

  1. public static class InsertData extends RichSinkFunction<JSONObject> {
  2. private MongoClient mongoClient;
  3. private Properties properties;
  4. private MongoCollection<Document> collection;
  5. public InsertData(Properties properties) {
  6. this.properties = properties;
  7. }
  8. @Override
  9. public void open(Configuration parameters) throws Exception {
  10. super.open(parameters);
  11. mongoClient = new MongoClient(new MongoClientURI(properties.getProperty("uri")));
  12. collection = mongoClient.getDatabase(properties.getProperty("dbName")).getCollection(properties.getProperty("collectionName"));
  13. }
  14. @Override
  15. public void invoke(JSONObject value, Context context) throws Exception {
  16. Document doc = new Document();
  17. doc.append("author_id", value.getString("author_id"));
  18. doc.append("name", value.getString("name"));
  19. // collection.insertOne(doc);
  20. BasicDBObject searchDoc = new BasicDBObject().append("author_id", doc.getString("author_id"));
  21. BasicDBObject newDoc = new BasicDBObject("$set", doc);
  22. collection.findOneAndUpdate(searchDoc, newDoc, new FindOneAndUpdateOptions().upsert(true));
  23. }
  24. @Override
  25. public void close() throws Exception {
  26. if (mongoClient != null) {
  27. mongoClient.close();
  28. }
  29. }
  30. }