1. 启动 spark-shell
spark2-shell \
--name wsy \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0
2. 配置设置
object MongoInput {
val uri = getUri("student")
private def getUri(collection: String): String = {
val user = "user"
val password = "123456"
val host = "hostip:3717"
val db = "inputdb"
val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}"
uri
}
}
object MongoOutput {
val uri = getUri("student")
private def getUri(collection: String): String = {
val user = "user"
val password = "123456"
val host = "hostip:3717"
val db = "outputdb"
val uri = s"mongodb://${user}:${password}@${host}/${db}.${collection}"
uri
}
}
3. 读写 mongo
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.mongodb.spark.rdd.partitioner.MongoSinglePartitioner
import com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner
import com.mongodb.spark.sql._
import mongo.util.{MongoINput, MongoOutput}
object Hello {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
import spark.implicits._
val inputUri = MongoINput.uri
val readConfig = ReadConfig(Map("uri" -> inputUri, "partitioner" -> "DefaultMongoPartitioner"))
val outputUri = MongoOutput.uri
val writeConfig = WriteConfig(Map("uri" -> outputUri))
val df = spark.read.mongo(readConfig)
df.write.mode(SaveMode.Append).mongo(writeConfig)
}
}