Spark SQL

2021-07-04

问题:read.json无法读取json

Exception in thread “main” org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
image.png

原因:JSON文件格式不对(中文需要特殊处理,要加空格)
image.png
image.png

2021-07-11

问题:运行时读取的数据库不是MySQL

报错内容:
21/07/11 14:13:39 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ———
java.sql.SQLException: Failed to start database ‘metastore_db’ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@b1f36e5, see the next exception for details.

原因:在hive-site.xml里面,搜索不到MySQL的配置(主要发生在CDH的cm安装上,其他情况一般不会)
解决办法:手动把MySQL的信息配置即可。

  1. <!--upsert into -->
  2. <property>
  3. <name>javax.jdo.option.ConnectionURL</name>
  4. <value>jdbc:mysql://10.168.1.12:3306/metastore</value>
  5. </property>
  6. <property>
  7. <name>javax.jdo.option.ConnectionDriverName</name>
  8. <value>com.mysql.jdbc.Driver</value>
  9. </property>
  10. <property>
  11. <name>javax.jdo.option.ConnectionUserName</name>
  12. <value>hive</value>
  13. </property>
  14. <property>
  15. <name>javax.jdo.option.ConnectionPassword</name>
  16. <value>password</value>
  17. </property>

问题:动态分区导致无法创建

报错内容:
Exception in thread “main” org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:157)

原因
因为要做动态分区, 所以要先设定partition参数
由于default是false, 需要额外下指令打开这个开关
default是strick, 表示不允许动态分区, 所以要改成nostrick

解决方式:代码中设定

  1. // 做动态分区, 所以要先设定partition参数
  2. // default是false, 需要额外下指令打开这个开关
  3. ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
  4. ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

Tips:在获取连接时指定config,或者连接后设置setConf均可。 image.png

2021-07-21

问题:SparkStreaming序列化Kafka数据失败

image.png

  1. java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
  2. Serialization stack:
  3. - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = demoTopic, partition = 0, offset = 73724625, CreateTime = 1626837252102, serialized key size = 8, serialized value size = 59, headers = RecordHeaders(headers = [], isReadOnly = false), key = 73637709, value = 2021-07-21 11:14:12,2021-07-21,73637709,刘慕青,31.941786))
  4. - element of array (index: 0)
  5. - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
  6. at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

解决方式:在构造SaprkConf的时候,添加序列化方法:
.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)

  1. val ssc = new StreamingContext(
  2. new SparkConf().setMaster("local[*]").setAppName("Kafka_To_Kudu")
  3. .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  4. , Seconds(1))

2021-08-31

问题:SparkSQl读取Kudu写入Hive分区报错

  1. 21/08/31 10:00:20 ERROR main.JustMain$: Output Hive table `realtimebakup`.`dcs_unit1_bakup` is bucketed but Spark currently does NOT populate bucketed output which is compatible with Hive.;
  2. org.apache.spark.sql.AnalysisException: Output Hive table `realtimebakup`.`dcs_unit1_bakup` is bucketed but Spark currently does NOT populate bucketed output which is compatible with Hive.;
  3. at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:180)

解决:取消分桶,同时取消排序

  1. beeline> set hive.enforce.bucketing=false;
  2. beeline> set hive.enforce.sorting=false;

也可以修改hive-site.xml

  1. <property>
  2. <name>hive.enforce.bucketing</name>
  3. <value>false</value>
  4. </property>
  5. <property>
  6. <name>hive.enforce.sorting</name>
  7. <value>false</value>
  8. </property>

代码中修改:

  1. // 做动态分区, 所以要先设定partition参数
  2. // default是false, 需要额外下指令打开这个开关
  3. ss.sqlContext.setConf("hive.exec.dynamic.partition;", "true");
  4. // 非严格模式
  5. ss.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
  6. // 设置关闭分桶与排序操作,否则写入hive会报错:
  7. // ... is bucketed but Spark currently does
  8. // NOT populate bucketed output which is compatible with Hive
  9. ss.sqlContext.setConf("hive.enforce.bucketing","false")
  10. ss.sqlContext.setConf("hive.enforce.sorting","false")
  11. log.info("获取SparkSession:" + ss)

问题:SparkSQL在HDFS生成多个文件

解决方式:使用groupby + 设置reduce数量
spark控制小文件的原理和hive一致,执行group bydistribute by强制触发reduce操作,达到控制小文件的目的。

  1. insert into table TABLE_NAME
  2. select ...... group by col;
  3. insert into table TABLE_NAME
  4. select ...... distribute by rand();

设置reduce数量的语法和hive稍有不同

  1. set spark.sql.shuffle.partitions=X;

如果reducer的数量设置的过小,而数据本身较大的情况下,可能会出现数量大量流入少量的几个节点,导致程序在最后的环节运行缓慢,因此也不是X设置的越小越好,需要视具体情况权衡选择。

2021-12-13

问题:IDEA无法运行Spark程序

报错内容:

  1. scalac: Token not found: C:\Users\xjr76\AppData\Local\JetBrains\IdeaIC2021.1\compile-server\tokens\3200

原因未知(可能是太久没运行了),解决办法:plugin里面,禁用Scala,然后启用,根据提示重启IDEA。