5.1 Intro
Hive -> Shark -> hive on Spark -> Spark SQL 进化史 https://zhuanlan.zhihu.com/p/29385628
Hive
SQL based on Hadoop,可看作编程接口,可将输入的SQL语句转换为mapreduce查询方法
Shark
(2014年停止,改为Spark SQL)
Hive on Spark, 为了实现与Hive兼容,Shark在HiveQL方面重用了相关解析,逻辑执行的翻译,逻辑优化等,可近似理解为,将物理执行从MapReduce作业替换成Spark作业,将SQL翻译成Spark上的RDD操作
优势
- 性能方面提升10-100倍,相比于Hive
缺陷
- 优化策略完全依赖Hive
- 由于Spark是线程级的并行,而MapReduce是进程级的并行。因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支
Spark SQL
A new SQL engine designed from ground-up for Spark
不再受限于Hive,只是兼容Hive
DataFrame
使用户可以在Spark SQL中执行SQL,数据可以是多种类型,文本/json
支持语言:java, scala, python
关系型数据库的不足**
只能进行聚合等SQL查询,不能进行过高级分析,如机器学习和图像处理
Spark SQL:为了实现同时对数据进行SQL查询,以及高级分析(机器学习)等多种数据处理方法,制作出Spark SQL
**
5. 2 DataFrame
功能
- 使Spark具备处理大规模结构化数据的能力,比RDD的处理效率更高
创建
SparkSession
应用程序指挥官 (app driver)
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 读写
spark.read.text("file:///../people.txt")
spark.read.json("hdfs://../people.json")
spark.read.parquet("people.parquet")
spark.read.format("text").load("people.txt")
spark.read.format("json").load("people.json")
# 示例
df = spark.read.json("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.json")
df.show()
保存
df.write.txt("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.format("text").save("people.txt")
df.select("name").write.format("text").save("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/mycode/sparksql/newpeople.txt")
上面的保存文件newpeople.txt是目录,其中有两个文件,加载时只要读取目录名称即可读取数据
常用操作
# 打印模式信息
df.printSchema()
# 取出特定列,并在某列+1,返回了新的df
df.select(df["name"], df["age"]+1).show()
# 过滤
df.filter(df["age"]>20).show()
# 分组
df.groupBy("age").count().show()
# 排序
df.sort(df["age"].desc()).show()
df.sort(df["age"].desc().df["name"].asc()).show()
5.3 DF和RDD转换
利用反射机制推断RDD
- 提前知晓数据结构 ```python from pyspark.sql import Row people = spark.sparkContext.\ textFile(“file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.txt”).\ map(lambda line: line.split(“,”)).\ map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = spark.createDataFrame(people)
必须注册为临时表才能SQL查询, people是临时表的名称
schemaPeople.createOrReplaceTempView(“people”) personsDF = spark.sql(“select name, age from people where age > 20”)
df中每个元素都是一行记录,包括p.name, p.age
df.rdd 将表中每一行作为一Row对象
personsRDD = personsDF.rdd.map(lambda p: “Name: “+p.name+”,”+”Age: “+str(p.age)) persons.foreach(print)
<a name="08jnz"></a>
## 利用编程方式定义RDD
1. 制作表头
1. 制作表中记录,封装为Row对象中
1. 将表头和记录拼接在一起
```python
from pyspark.sql.types import *
from pyspark.sql import Row
# 表头
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
# 生成表中记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/spark-3.0.1-bin-hadoop3.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda x: x.split(","))
people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 拼接
schemaPeople = spark.createDataFrame(people, schema)
# 注册临时表,SQL查询
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("select name, age from people")
results.show()
MySQL 数据操作
使用JDBC连接Mysql,首先要安装驱动程序https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.22
cp ~/Downloads/mysql-connector-java-8.0.22.jar $SPARK_HOME/jars/
读取数据
# option 需要一个一个指定
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password", "fieldwater").load()
jdbcDF.show()
写入数据
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 设置模式信息:表头
schema = StructType([StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True),
])
# 学生信息
studentRDD = spark.sparkContext.parallelize([
"3 Rongcheng M 26",
"4 Guanhua M 27"
]).map(lambda x: x.split(" "))
rowRDD = studentRDD.map(lambda p: Row(
int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())
))
# 拼接
studentDF = spark.createDataFrame(rowRDD, schema)
# 写入数据库
prop = {'user': 'root', 'password': 'fieldwater', 'driver': 'com.mysql.cj.jdbc.Driver'}
studentDF.write.jdbc(url="jdbc:mysql://localhost:3306/spark", table='student', 'append', properties=prop)