1 Hive-On-Spark

Hive On Spark (跟hive没太大的关系,就是使用了hive的标准(HQL, 元数据库、UDF、序列化、反序列化机制))

Hive原来的计算模型是MR,有点慢(将中间结果写入到HDFS中)

Hive On Spark 使用RDD(DataFrame),然后运行在spark 集群上

真正要计算的数据是保存在HDFS中,mysql这个元数据库,保存的是hive表的描述信息,描述了有哪些database、table、以及表有多少列,每一列是什么类型,还要描述表的数据保存在hdfs的什么位置?

hive跟mysql的区别?

hive是一个数据仓库(存储数据并分析数据,分析数据仓库中的数据量很大,一般要分析很长的时间)
mysql是一个关系型数据库(关系型数据的增删改查(低延迟))

hive的元数据库中保存要计算的数据吗?
不保存,保存hive仓库的表、字段、等描述信息

真正要计算的数据保存在哪里了?
保存在HDFS中了

hive的元数据库的功能
建立了一种映射关系,执行HQL时,先到MySQL元数据库中查找描述信息,然后根据描述信息生成任务,然后将任务下发到spark集群中执行

1.1 已存在的hive数据仓库

使用 # hive 进入hive shell

create table t_access_times(username string,month string,salary int)
row format delimited fields terminated by ‘,’;

load data local inpath ‘/root/accdata.txt’ into table t_access_times;

A,2015-01,5
A,2015-01,15
B,2015-01,5
A,2015-01,8
B,2015-01,25
A,2015-01,5
C,2015-01,10
C,2015-01,20
A,2015-02,4
A,2015-02,6
C,2015-02,30
C,2015-02,10
B,2015-02,10
B,2015-02,5
A,2015-03,14
A,2015-03,6
B,2015-03,20
B,2015-03,25
C,2015-03,10
C,2015-03,20

简单测试
hive > select * from t_access_times;
求每个用户的月总金额
hive > select username,month,sum(salary) as salary from t_access_times group by username,month;

spark程序

要先开启spark对hive的支持

//如果想让hive运行在spark上,一定要开启spark对hive的支持val session = SparkSession.builder()
.master(“local”)
.appName(“xx”)
.enableHiveSupport() // 启动对hive的支持, 还需添加支持jar包
.getOrCreate()

要添加spark对hive的兼容jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

还要把hive的元数据配置文件hive-site.xml添加到spark的conf目录下。
如果在本地测试,直接把hive-site.xml文件拷贝到resource目录下。
resources目录,存放着当前项目的配置文件
HiveOnSpark - 图1

hive-site配置文件

| <configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hdp-04:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
</configuration> | | —- |

编写代码,local模式下测试:

// 执行查询val query = session.sql(“select * from t_access_times”)
query.show()_// 释放资源_session.close()

创建表的时候,需要伪装客户端身份
报错:
HiveOnSpark - 图2

System.setProperty(“HADOOP_USER_NAME”, “root”) // 伪装客户端的用户身份为root
// 或者添加运行参数 –DHADOOP_USER_NAME=root

基本操作

| _// 求每个用户的每月总金额
// session.sql(“select username,month,sum(salary) as salary from t_access_times group by username,month”)
// 创建表
// session.sql(“create table t_access1(username string,month string,salary int) row format delimited fields terminated by ‘,’”)

  1. // 删除表<br /> // session.sql("drop table t_access1")
  2. // 插入数据<br /> // session.sql("insert into t_access1 select * from t_access_times")<br /> // .show()<br /> // 覆盖写数据<br /> // session.sql("insert __overwrite __table t_access1 select * from t_access_times where username='A'")
  3. // 覆盖load新数据<br /> // C,2015-01,10<br /> // C,2015-01,20<br /> // session.sql("load data local inpath 't_access_time_log' overwrite into table t_access1")
  4. // 清空数据<br /> // session.sql("truncate table t_access1")
  5. // .show()
  6. // 写入自定义数据<br /> _**val **access: Dataset[String] = session.createDataset(_List_(**"b,2015-01,10"**, **"c,2015-02,20"**))
  7. **val **accessdf = access.map({<br /> t =><br /> **val **lines = t.split(**","**)<br /> (lines(0), lines(1), lines(2).toInt)<br /> }).toDF(**"username"**, **"month"**, **"salary"**)
  8. _// .show()
  9. _accessdf.createTempView(**"t_ac"**)<br /> _// session.sql("insert into t_access1 select * from t_ac")
  10. // overwrite模式会重新创建新的表 根据指定schema信息 SaveMode.Overwrite<br /> // 本地模式只支持 overwrite,必须在sparksession上添加配置参数:<br />// .config("spark.sql.warehouse.dir", "hdfs://hdp-01:9000/user/hive/warehouse")<br /> _accessdf<br /> .write.mode(**"overwrite"**).saveAsTable(**"t_access1"**) |

| —- |

集群运行:
需要把hive-site.xml配置文件,添加到$SPARK_HOME/conf目录中去,重启spark
上传一个mysql连接驱动(sparkSubmit也要连接MySQL,获取元数据信息)
spark-sql —master spark://hdp-01:7077 —driver-class-path /root/mysql-connector-java-5.1.38.jar
—class xx.jar

1.2 完全抛离hive

hive on spark 使用的仅仅是hive的标准,规范,不需要有hive数据库一样可行。
hive : 元数据,是存放在mysql中,然后真正的数据是存放在hdfs中。

1,安装mysql,
使用# yum -y install mysql mysql-server

安装MySQL(hive的元数据库),并且授权root用户
GRANT ALL PRIVILEGES ON . TO ‘root’@’%’ IDENTIFIED BY ‘123456’ WITH GRANT OPTION;
FLUSH PRIVILEGES;

到此为止,hive使用的云数据存放的mysql数据库,已经准备完毕。

2,生成hive的元数据库表,根据hive的配置文件,生成对应的元数据库表。

spark-sql 是spark专门用于编写sql的交互式命令行。
当直接启动spark-sql以local模式运行时,如果报错:
HiveOnSpark - 图3
是因为配置了Hadoop的配置参数导致的:
HiveOnSpark - 图4

hive-site.xml文件:

| <configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hdp-01:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>

  1. <**property**><br /> <**name**>javax.jdo.option.ConnectionDriverName</**name**><br /> <**value**>com.mysql.jdbc.Driver</**value**><br /> <**description**>Driver class name for a JDBC metastore</**description**><br /> </**property**>
  2. <**property**><br /> <**name**>javax.jdo.option.ConnectionUserName</**name**><br /> <**value**>root</**value**><br /> <**description**>username to use against metastore database</**description**><br /> </**property**>
  3. <**property**><br /> <**name**>javax.jdo.option.ConnectionPassword</**name**><br /> <**value**>123456</**value**><br /> <**description**>password to use against metastore database</**description**><br /> </**property**><br /></**configuration**> |

| —- |

把hive-site.xml文件添加到spark的conf目录下。
然后呢把该配置文件,发送给集群中的其他节点:

cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/
for i in 2 3 ;do scp hive-site.xml hdp-0$i:pwd ;done

重新停止并重启spark: start-all.sh

启动spark-sql时,
出现如下错误是因为操作mysql时缺少mysql的驱动jar包,
解决方案1:—jars 或者 —driver-class-path 引入msyql的jar包
解决方案2: 把mysql的jar包添加到$spark_home/jars目录下
HiveOnSpark - 图5

启动时指定集群:(如果不指定master,默认就是local模式)
spark-sql —master spark://hdp-01:7077 —jars /root/mysql-connector-java-5.1.38.jar

sparkSQL会在mysql上创建一个database,需要手动改一下DBS表中的DB_LOCATION_UIR改成hdfs的地址
HiveOnSpark - 图6

也需要查看一下,自己创建的数据库表的存储路径是否是hdfs的目录。
HiveOnSpark - 图7

执行spark-sql任务之后:可以在集群的监控界面查看
HiveOnSpark - 图8
同样 ,会有SparkSubmit进程存在。

如果是在local模式下编写程序:
把hive-site.xml文件添加到resouce目录下
需要spark开启对hive的支持,然后添加spark-hive的jar包
然后执行代码的编写:

| object HiveOnSpark {
def main(args: Array[String]): Unit = {
_// 如果spark想要操作hive,就需要开启对hive的支持

  1. // 伪装用户的身份<br />// System.setProperty("HADOOP_USER_NAME", "root")<br /> _**val **session = SparkSession._builder_()_// .master("local")<br /> _.appName(IPsql.getClass.getSimpleName)<br /> .enableHiveSupport() _// 开启对hive的支持<br /> _.getOrCreate()
  2. **import **session.implicits._
  3. _// 执行查询 hive的数据表<br />// session.sql("select * from t_access_times")<br />// .show()
  4. // 创建表<br />// session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")

// session.sql(“insert into t_access1 select * from t_access_times”)
// .show()

  1. // 写数据<br /> _**val **access: Dataset[String] = session.createDataset(_List_(**"b,2015-01,10"**, **"c,2015-02,20"**))
  2. **val **accessdf = access.map({<br /> t =><br /> **val **lines = t.split(**","**)<br /> (lines(0), lines(1), lines(2).toInt)<br /> }).toDF(**"username"**, **"month"**, **"salary"**)
  3. accessdf.createTempView(**"v_tmp"**)<br /> _// 插入数据<br />// session.sql("insert overwrite table t_access1 select * from v_tmp")<br /> _session.sql(**"insert into t_access1 select * from v_tmp"**)_// .show()_<br />_// insertInto的api 入库_accessdf.write.insertInto(**"databaseName.tableName"**)<br />_ _session.close()<br /> }

} | | —- |

在集群模式下,依然可以通过spark-submit来提交任务:
spark-submit —master spark://hdp-01:7077 —jars mysql-connector-java-5.1.38.jar —class cn.edu360.spark28.day08.HiveOnSpark original-spark28-1.0-SNAPSHOT.jar