5.1 Intro

Hive -> Shark -> hive on Spark -> Spark SQL 进化史 https://zhuanlan.zhihu.com/p/29385628

Hive

SQL based on Hadoop,可看作编程接口,可将输入的SQL语句转换为mapreduce查询方法

Shark

(2014年停止,改为Spark SQL)

Hive on Spark, 为了实现与Hive兼容,Shark在HiveQL方面重用了相关解析,逻辑执行的翻译,逻辑优化等,可近似理解为,将物理执行从MapReduce作业替换成Spark作业,将SQL翻译成Spark上的RDD操作

优势

  • 性能方面提升10-100倍,相比于Hive

缺陷

  • 优化策略完全依赖Hive
  • 由于Spark是线程级的并行,而MapReduce是进程级的并行。因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支

Spark SQL

A new SQL engine designed from ground-up for Spark
不再受限于Hive,只是兼容Hive

DataFrame
使用户可以在Spark SQL中执行SQL,数据可以是多种类型,文本/json

支持语言:java, scala, python

关系型数据库的不足**
只能进行聚合等SQL查询,不能进行过高级分析,如机器学习和图像处理

Spark SQL:为了实现同时对数据进行SQL查询,以及高级分析(机器学习)等多种数据处理方法,制作出Spark SQL
**


5. 2 DataFrame


功能

  • 使Spark具备处理大规模结构化数据的能力,比RDD的处理效率更高

创建

SparkSession 应用程序指挥官 (app driver)

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession
  3. spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
  4. # 读写
  5. spark.read.text("file:///../people.txt")
  6. spark.read.json("hdfs://../people.json")
  7. spark.read.parquet("people.parquet")
  8. spark.read.format("text").load("people.txt")
  9. spark.read.format("json").load("people.json")
  10. # 示例
  11. df = spark.read.json("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.json")
  12. df.show()

保存

  1. df.write.txt("people.txt")
  2. df.write.json("people.json")
  3. df.write.parquet("people.parquet")
  4. df.write.format("text").save("people.txt")
  5. df.select("name").write.format("text").save("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/mycode/sparksql/newpeople.txt")

上面的保存文件newpeople.txt是目录,其中有两个文件,加载时只要读取目录名称即可读取数据

常用操作

  1. # 打印模式信息
  2. df.printSchema()
  3. # 取出特定列,并在某列+1,返回了新的df
  4. df.select(df["name"], df["age"]+1).show()
  5. # 过滤
  6. df.filter(df["age"]>20).show()
  7. # 分组
  8. df.groupBy("age").count().show()
  9. # 排序
  10. df.sort(df["age"].desc()).show()
  11. df.sort(df["age"].desc().df["name"].asc()).show()

5.3 DF和RDD转换

利用反射机制推断RDD

  • 提前知晓数据结构 ```python from pyspark.sql import Row people = spark.sparkContext.\ textFile(“file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.txt”).\ map(lambda line: line.split(“,”)).\ map(lambda p: Row(name=p[0], age=int(p[1])))

schemaPeople = spark.createDataFrame(people)

必须注册为临时表才能SQL查询, people是临时表的名称

schemaPeople.createOrReplaceTempView(“people”) personsDF = spark.sql(“select name, age from people where age > 20”)

df中每个元素都是一行记录,包括p.name, p.age

df.rdd 将表中每一行作为一Row对象

personsRDD = personsDF.rdd.map(lambda p: “Name: “+p.name+”,”+”Age: “+str(p.age)) persons.foreach(print)

  1. <a name="08jnz"></a>
  2. ## 利用编程方式定义RDD
  3. 1. 制作表头
  4. 1. 制作表中记录,封装为Row对象中
  5. 1. 将表头和记录拼接在一起
  6. ```python
  7. from pyspark.sql.types import *
  8. from pyspark.sql import Row
  9. # 表头
  10. schemaString = "name age"
  11. fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
  12. schema = StructType(fields)
  13. # 生成表中记录
  14. lines = spark.sparkContext.textFile("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.txt")
  15. parts = lines.map(lambda x: x.split(","))
  16. people = parts.map(lambda p: Row(p[0], p[1].strip()))
  17. # 拼接
  18. schemaPeople = spark.createDataFrame(people, schema)
  19. # 注册临时表,SQL查询
  20. schemaPeople.createOrReplaceTempView("people")
  21. results = spark.sql("select name, age from people")
  22. results.show()

MySQL 数据操作

使用JDBC连接Mysql,首先要安装驱动程序https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.22

  1. cp ~/Downloads/mysql-connector-java-8.0.22.jar $SPARK_HOME/jars/

读取数据

  1. # option 需要一个一个指定
  2. jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password", "fieldwater").load()
  3. jdbcDF.show()

写入数据

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import *
  3. from pyspark import SparkContext, SparkConf
  4. from pyspark.sql import SparkSession
  5. spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
  6. # 设置模式信息:表头
  7. schema = StructType([StructField("id", IntegerType(), True),
  8. StructField("name", StringType(), True),
  9. StructField("gender", StringType(), True),
  10. StructField("age", IntegerType(), True),
  11. ])
  12. # 学生信息
  13. studentRDD = spark.sparkContext.parallelize([
  14. "3 Rongcheng M 26",
  15. "4 Guanhua M 27"
  16. ]).map(lambda x: x.split(" "))
  17. rowRDD = studentRDD.map(lambda p: Row(
  18. int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())
  19. ))
  20. # 拼接
  21. studentDF = spark.createDataFrame(rowRDD, schema)
  22. # 写入数据库
  23. prop = {'user': 'root', 'password': 'fieldwater', 'driver': 'com.mysql.cj.jdbc.Driver'}
  24. studentDF.write.jdbc(url="jdbc:mysql://localhost:3306/spark", table='student', 'append', properties=prop)