宋明的一些记录 引入的依赖
//如果要用MLLib和SparkHive的话还要加相应的libraryDependencies
//宋明(Spark) 2015-07-28 17:36:24
// for breeze
libraryDependencies += "org.scalanlp" %% "breeze" % "0.10" % "provided"
// for breeze
libraryDependencies += "org.scalanlp" %% "breeze-natives" % "0.10" % "provided"
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-graphx" % "1.2.1" % "provided").
exclude("org.eclipse.jetty.orbit","javax.transaction").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("com.esotericsoftware.kryo","kryo").
exclude("commons-beanutils","commons-beanutils").
exclude("commons-collections","commons-collections")
)
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-mllib" % "1.2.1" % "provided").
exclude("org.eclipse.jetty.orbit","javax.transaction").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("com.esotericsoftware.kryo","kryo").
exclude("commons-beanutils","commons-beanutils").
exclude("commons-collections","commons-collections")
)
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.2.1" % "provided").
exclude("org.eclipse.jetty.orbit","javax.servlet").
exclude("org.eclipse.jetty.orbit","javax.mail.glassfish").
exclude("org.eclipse.jetty.orbit","javax.activation").
exclude("com.esotericsoftware.kryo","kryo").
exclude("commons-beanutils","commons-beanutils-core").
exclude("commons-beanutils","commons-beanutils")
)
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.0.0" % "provided"
没有hsql的情况处理日志 没什么意义
hdfs:///home/hdp-like/channel/online/pay_data/2015-06-20/04_pay_channel.log
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
//case class FirstLogin(qid: String, channel: String, app_key:String, login_date: String)
case class FirstLogin(qid: String)
//val people = sc.textFile("hdfs:///home/hdp-like/channel/online/pay_data").map(_.split("\t")).map(p => FirstLogin(p(0), p(1),p(2),p(3))).toDF()
//val rdd = sc.textFile("hdfs:///home/hdp-like/zhushou/db/first_login_detail_qid_channel_appkey/*").map(_.split("\t")).map(p => FirstLogin(p(0), p(1),p(2),p(3))).toDF()
val rdd = sc.textFile("hdfs:///home/hdp-like/zhushou/db/first_login_detail_qid_channel_appkey/*").map(_.split("\t")).map(p => FirstLogin(p(0))).toDF()
rdd.registerTempTable("table_test")
val res = sqlContext.sql("SELECT count(1) FROM table_test")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)