看注释
package cool.xiaokang.sqlimport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.sql.functions._/** * SparkSql常用函数 * * @author: xiaokang * 微信公众号:小康新鲜事儿 * 小康个人文档:https://www.xiaokang.cool/ */// 1000 xiaokangXXs聆听ゝ尔伈 916458603 蔡文姬 123.234.77.79 微信 2020-03-20 09:30:11 2020-03-20case class UserLogin(loginId: Int, userName: String, userId: String, roleName: String, loginIp: String, loginType: String, loginTime: String, loginDate: String)object SparkSqlDemo { // 创建sparkSession def createSparkSession(master: String = "local[*]", appName: String = this.getClass.getName, isHive: Boolean = false): SparkSession = { if (isHive) { SparkSession.builder().master(master).appName(master).enableHiveSupport().getOrCreate() } else { SparkSession.builder().master(master).appName(appName).getOrCreate() } } // 创建DataFrame def readDataSource(path: String, isRdd: Boolean = true): (DataFrame,SparkSession) = { val session = createSparkSession() session.sparkContext.setLogLevel("WARN") if (isRdd) { val dataSourceRDD = session.sparkContext.textFile(path) val userLoginRDD = dataSourceRDD.mapPartitions(it => { it.map { line => { val userStr = line.split("\t") UserLogin(userStr(0).toInt, userStr(1), userStr(2), userStr(3), userStr(4), userStr(5), userStr(6), userStr(7)) } } }) import session.implicits._ (userLoginRDD.toDF(),session) } else { (session.read.text(path),session) } } def main(args: Array[String]): Unit = { val userLoginDf = readDataSource("D:/IDEA-Projects/SparkApp/game_login_data") userLoginDf._1.where("userId=916458603") //当前日期基础上加1天 .withColumn("date_add",date_add(col("loginTime"),1)) //当前日期基础上减1天 .withColumn("date_sub",date_sub(col("loginTime"),1)) //获取当前日期的下一个星期一的日期 .withColumn("next_day",next_day(col("loginTime"),"MON")) //获取当月最后一天日期 .withColumn("last_day",last_day(col("loginTime"))) //获取年 .withColumn("year",year(col("loginTime"))) //获取月 .withColumn("month",month(col("loginTime"))) //获取季度 .withColumn("quarter",quarter(col("loginTime"))) //一年中第几天 .withColumn("dayOfYear",dayofyear(col("loginTime"))) //获取时间戳 .withColumn("unixTimestamp",unix_timestamp(col("loginTime"))) //日期截取 .withColumn("parseToDay",substring(col("loginTime"),1,10)) //字符串拼接 .withColumn("concat",concat(col("userName"),col("userId"))) //字符串拼接并指定拼接时候的拼接符 .withColumn("concat_ws",concat_ws("***",col("userName"),col("userId"))) //将每个单词的首字母变为大写 .withColumn("initcap",initcap(col("userName"))) //转小写 .withColumn("lower",lower(col("userName"))) //转大写 .withColumn("upper",upper(col("userName"))) .show() }}