1. 启动 spark-shell

  1. spark2-shell \
  2. --name wsy \
  3. --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

2. 配置设置

  1. object MongoInput {
  2. val uri = getUri("student")
  3. private def getUri(collection: String): String = {
  4. val user = "user"
  5. val password = "123456"
  6. val host = "hostip:3717"
  7. val db = "inputdb"
  8. val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}"
  9. uri
  10. }
  11. }
  12. object MongoOutput {
  13. val uri = getUri("student")
  14. private def getUri(collection: String): String = {
  15. val user = "user"
  16. val password = "123456"
  17. val host = "hostip:3717"
  18. val db = "outputdb"
  19. val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}"
  20. uri
  21. }
  22. }

3. 读写 mongo

  1. import com.mongodb.spark.config.ReadConfig
  2. import com.mongodb.spark.config.WriteConfig
  3. import org.apache.spark.sql.{SaveMode, SparkSession}
  4. import com.mongodb.spark.rdd.partitioner.MongoSinglePartitioner
  5. import com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner
  6. import com.mongodb.spark.sql._
  7. import mongo.util.{MongoINput, MongoOutput}
  8. object Hello {
  9. def main(args: Array[String]): Unit = {
  10. val spark = SparkSession.builder()
  11. .master("local")
  12. .appName(this.getClass.getSimpleName)
  13. .getOrCreate()
  14. import spark.implicits._
  15. val inputUri = MongoINput.uri
  16. val readConfig = ReadConfig(Map("uri" -> inputUri, "partitioner" -> "DefaultMongoPartitioner"))
  17. val outputUri = MongoOutput.uri
  18. val writeConfig = WriteConfig(Map("uri" -> outputUri))
  19. val df = spark.read.mongo(readConfig)
  20. df.write.mode(SaveMode.Append).mongo(writeConfig)
  21. }
  22. }