1. 启动 spark-shell
spark2-shell \--name wsy \--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0
2. 配置设置
object MongoInput { val uri = getUri("student") private def getUri(collection: String): String = { val user = "user" val password = "123456" val host = "hostip:3717" val db = "inputdb" val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}" uri }}object MongoOutput { val uri = getUri("student") private def getUri(collection: String): String = { val user = "user" val password = "123456" val host = "hostip:3717" val db = "outputdb" val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}" uri }}
3. 读写 mongo
import com.mongodb.spark.config.ReadConfigimport com.mongodb.spark.config.WriteConfigimport org.apache.spark.sql.{SaveMode, SparkSession}import com.mongodb.spark.rdd.partitioner.MongoSinglePartitionerimport com.mongodb.spark.rdd.partitioner.DefaultMongoPartitionerimport com.mongodb.spark.sql._import mongo.util.{MongoINput, MongoOutput}object Hello { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName(this.getClass.getSimpleName) .getOrCreate() import spark.implicits._ val inputUri = MongoINput.uri val readConfig = ReadConfig(Map("uri" -> inputUri, "partitioner" -> "DefaultMongoPartitioner")) val outputUri = MongoOutput.uri val writeConfig = WriteConfig(Map("uri" -> outputUri)) val df = spark.read.mongo(readConfig) df.write.mode(SaveMode.Append).mongo(writeConfig) }}