连接

场景较多

问题解决:spark-shell可以读写hive,但spark-submit不行

场景:
hive中有两张表,用spark-shell可以获取,但用spark-submit只能获取一张

  1. hive
  2. hive> show databases;
  3. default
  4. test
  5. spark-shell
  6. >> sql(" show databases").show()
  7. >>
  8. +-------------------+
  9. |databaseName |
  10. +-------------------+
  11. | default|
  12. | test|
  13. +-------------------+
  14. spark-submit
  15. from pyspark.sql.session import SparkSession
  16. from pyspark.context import SparkContext
  17. sc = SparkContext.getOrCreate()
  18. spark = SparkSession(sc)
  19. spark.sql(" show databases").show()
  20. +-------------------+
  21. |databaseName |
  22. +-------------------+
  23. | default|
  24. +-------------------+

$SPARK_HOME/bin/spark-shell启动的类为org.apache.spark.repl.Main,部分源码如下:

  1. ...
  2. sparkContext = sparkSession.sparkContext
  3. sparkSession
  4. ...

问题在于源代码是先创建SparkSession,再从SparkSession中获取SparkContext,另外注意到之前有个WARN级别的日志。
19/05/31 13:11:31 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
因此只需要调整顺序即可。

  1. from pyspark.conf import SparkConf
  2. spark = SparkSession.builder.config(conf=SparkConf()).enableHiveSupport().getOrCreate()
  3. sc = spark.sparkContext
  4. spark.sql(" show databases").show()
  5. +-------------------+
  6. |databaseName |
  7. +-------------------+
  8. | default|
  9. | test|
  10. +-------------------+

读写

关键点:
一:API
saveAsTable 会根据表的schema匹配df的字段进行存储
insertInto,要求表的schema与df必须一致才可以
对于Hive分区表的写入,insertInto要待参数覆盖为True,这样每次会覆盖分区。注意不要使用saveAsTable!,会将全表覆盖,
正确语句,具体变化参考pyspark版本:
df.write.format(“hive”).insertInto(“dev.dev_rep_rebate_bjcouple_partion_orc”,True)
二:sql方式
将df创建为临时表,再使用spark.sql 里传hive语句insert select

PySaprk 将 DataFrame 数据保存为 Hive 分区表

创建 SparkSession

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.enableHiveSupport().appName('test_app').getOrCreate()
  3. sc = spark.sparkContext
  4. hc = HiveContext(sc)

1. Spark创建分区表

  1. # 可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
  2. df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

saveAsTable 会自动创建hive表,partitionBy指定分区字段,默认存储为 parquet 文件格式。对于从文件生成的DataFrame,字段类型也是自动转换的,有时会转换成不符合要求的类型。
需要自定义字段类型的,可以在创建DataFrame时指定类型:

  1. from pyspark.sql.types import StringType, StructType, BooleanType, StructField
  2. schema = StructType([
  3. StructField("vin", StringType(), True),
  4. StructField("cust_id", StringType(), True),
  5. StructField("is_maintain", BooleanType(), True),
  6. StructField("is_wash", BooleanType(), True),
  7. StructField("pt_day", StringType(), True),
  8. ]
  9. )
  10. data = pd.read_csv('/path/to/data.csv', header=0)
  11. df = spark.createDataFrame(data, schema=schema)
  12. # 写入hive表时就是指定的数据类型了
  13. df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

2、向已存在的表插入数据

2.1 Spark创建的分区表

这种情况其实和建表语句一样就可以了
不需要开启动态分区

  1. df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

2.2 在Hive命令行或者Hive sql语句创建的表

  • 这里主要指和Spark创建的表的文件格式不一样,Spark默认的文件格式为PARQUET,为在命令行Hive默认的文件格式为TEXTFILE,这种区别,也导致了异常的出现。

    1. pyspark.sql.utils.AnalysisException: u"The format of the existing table default.csd_test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;"
  • 需要开启动态分区, 不开启会有异常:

    1. org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
  • 代码

    1. # 建表
    2. sql_str = "CREATE TABLE IF NOT EXISTS default.csd_test_partition (cust_id string, vin string, is_maintain boolean, is_wash boolean) partitioned by (pt_day string)"
    3. hc.sql(sql_str)
    4. # 开启动态分区
    5. spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    6. spark.sql("set hive.exec.dynamic.partition=true")
    7. # 指定文件格式
    8. df.write.saveAsTable(save_table, format='Hive', mode='append', partitionBy=['pt_day'])
  • 通过临时视图创建

    1. # 数据包含分区字段的,不要指定分区,要开启动态分区
    2. df1.createTempView('temp_table')
    3. # 或 df1.registerTempTable('temp_table')
    4. hc.sql('insert into default.csd_test_partition select * from temp_table')
    1. # 数据不包含分区字段的,可以直接指定分区插入,可以不用开启动态分区
    2. df2 = df1.drop('pt_day')
    3. df2.registerTempTable('temp_table')
    4. hc.sql('insert into default.csd_test_partition partition(pt_day="20190516") select * from temp_table1')
  • 3、总结
    3.1 df.write.saveAsTable() 方法
    mode=‘overwrite’ 模式时,会创建新的表,若表名已存在则会被删除,整个表被重写。而 mode=‘append’ 模式会在直接在原有数据增加新数据。
    3.2 sql 语句进行插入
    sql 语句插入只能先行建表,在执行插入操作。
    INSERT INTO tableName PARTITION(pt=pt_value) select from temp_table 的语句类似于 append 追加的方式。
    INSERT OVERWRITE TABLE tableName PARTITION(pt=pt_value) SELECT
    FROM temp_table 的语句能指定分区进行重写,而不会重写整张表。
    sql 语句的方式比 .write.saveAsTable() 方法更灵活。
    3.3 保存 hive 表的文件数量设置
    默认的方式将会在hive分区表中保存大量的小文件,在保存之前对 DataFrame 用 .repartition() 重新分区,这样就能控制保存的文件数量。
    如:

    1. df.repartition(5).write.saveAsTable(...)
    2. df.repartition(5).registerTempTable('temp_table')