看注释

  1. package cool.xiaokang.sql
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. import org.apache.spark.sql.functions._
  4. /**
  5. * SparkSql常用函数
  6. *
  7. * @author: xiaokang
  8. * 微信公众号:小康新鲜事儿
  9. * 小康个人文档:https://www.xiaokang.cool/
  10. */
  11. // 1000 xiaokangXXs聆听ゝ尔伈 916458603 蔡文姬 123.234.77.79 微信 2020-03-20 09:30:11 2020-03-20
  12. case class UserLogin(loginId: Int, userName: String, userId: String, roleName: String, loginIp: String, loginType: String, loginTime: String, loginDate: String)
  13. object SparkSqlDemo {
  14. // 创建sparkSession
  15. def createSparkSession(master: String = "local[*]", appName: String = this.getClass.getName, isHive: Boolean = false): SparkSession = {
  16. if (isHive) {
  17. SparkSession.builder().master(master).appName(master).enableHiveSupport().getOrCreate()
  18. } else {
  19. SparkSession.builder().master(master).appName(appName).getOrCreate()
  20. }
  21. }
  22. // 创建DataFrame
  23. def readDataSource(path: String, isRdd: Boolean = true): (DataFrame,SparkSession) = {
  24. val session = createSparkSession()
  25. session.sparkContext.setLogLevel("WARN")
  26. if (isRdd) {
  27. val dataSourceRDD = session.sparkContext.textFile(path)
  28. val userLoginRDD = dataSourceRDD.mapPartitions(it => {
  29. it.map { line => {
  30. val userStr = line.split("\t")
  31. UserLogin(userStr(0).toInt, userStr(1), userStr(2), userStr(3), userStr(4), userStr(5), userStr(6), userStr(7))
  32. }
  33. }
  34. })
  35. import session.implicits._
  36. (userLoginRDD.toDF(),session)
  37. } else {
  38. (session.read.text(path),session)
  39. }
  40. }
  41. def main(args: Array[String]): Unit = {
  42. val userLoginDf = readDataSource("D:/IDEA-Projects/SparkApp/game_login_data")
  43. userLoginDf._1.where("userId=916458603")
  44. //当前日期基础上加1天
  45. .withColumn("date_add",date_add(col("loginTime"),1))
  46. //当前日期基础上减1天
  47. .withColumn("date_sub",date_sub(col("loginTime"),1))
  48. //获取当前日期的下一个星期一的日期
  49. .withColumn("next_day",next_day(col("loginTime"),"MON"))
  50. //获取当月最后一天日期
  51. .withColumn("last_day",last_day(col("loginTime")))
  52. //获取年
  53. .withColumn("year",year(col("loginTime")))
  54. //获取月
  55. .withColumn("month",month(col("loginTime")))
  56. //获取季度
  57. .withColumn("quarter",quarter(col("loginTime")))
  58. //一年中第几天
  59. .withColumn("dayOfYear",dayofyear(col("loginTime")))
  60. //获取时间戳
  61. .withColumn("unixTimestamp",unix_timestamp(col("loginTime")))
  62. //日期截取
  63. .withColumn("parseToDay",substring(col("loginTime"),1,10))
  64. //字符串拼接
  65. .withColumn("concat",concat(col("userName"),col("userId")))
  66. //字符串拼接并指定拼接时候的拼接符
  67. .withColumn("concat_ws",concat_ws("***",col("userName"),col("userId")))
  68. //将每个单词的首字母变为大写
  69. .withColumn("initcap",initcap(col("userName")))
  70. //转小写
  71. .withColumn("lower",lower(col("userName")))
  72. //转大写
  73. .withColumn("upper",upper(col("userName")))
  74. .show()
  75. }
  76. }