Spark SQL
2021-07-04
问题:read.json无法读取json
Exception in thread “main” org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
原因:JSON文件格式不对(中文需要特殊处理,要加空格)
2021-07-11
问题:运行时读取的数据库不是MySQL
报错内容:
21/07/11 14:13:39 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ———
java.sql.SQLException: Failed to start database ‘metastore_db’ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@b1f36e5, see the next exception for details.
原因:在hive-site.xml里面,搜索不到MySQL的配置(主要发生在CDH的cm安装上,其他情况一般不会)
解决办法:手动把MySQL的信息配置即可。
<!--upsert into -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.168.1.12:3306/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>password</value>
</property>
问题:动态分区导致无法创建
报错内容:
Exception in thread “main” org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:157)
原因:
因为要做动态分区, 所以要先设定partition参数
由于default是false, 需要额外下指令打开这个开关
default是strick, 表示不允许动态分区, 所以要改成nostrick
解决方式:代码中设定
// 做动态分区, 所以要先设定partition参数
// default是false, 需要额外下指令打开这个开关
ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");
Tips:在获取连接时指定config,或者连接后设置setConf均可。
2021-07-21
问题:SparkStreaming序列化Kafka数据失败
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = demoTopic, partition = 0, offset = 73724625, CreateTime = 1626837252102, serialized key size = 8, serialized value size = 59, headers = RecordHeaders(headers = [], isReadOnly = false), key = 73637709, value = 2021-07-21 11:14:12,2021-07-21,73637709,刘慕青,31.941786))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
解决方式:在构造SaprkConf的时候,添加序列化方法:
.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
val ssc = new StreamingContext(
new SparkConf().setMaster("local[*]").setAppName("Kafka_To_Kudu")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
, Seconds(1))
2021-08-31
问题:SparkSQl读取Kudu写入Hive分区报错
21/08/31 10:00:20 ERROR main.JustMain$: Output Hive table `realtimebakup`.`dcs_unit1_bakup` is bucketed but Spark currently does NOT populate bucketed output which is compatible with Hive.;
org.apache.spark.sql.AnalysisException: Output Hive table `realtimebakup`.`dcs_unit1_bakup` is bucketed but Spark currently does NOT populate bucketed output which is compatible with Hive.;
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:180)
解决:取消分桶,同时取消排序
beeline> set hive.enforce.bucketing=false;
beeline> set hive.enforce.sorting=false;
也可以修改hive-site.xml
<property>
<name>hive.enforce.bucketing</name>
<value>false</value>
</property>
<property>
<name>hive.enforce.sorting</name>
<value>false</value>
</property>
代码中修改:
// 做动态分区, 所以要先设定partition参数
// default是false, 需要额外下指令打开这个开关
ss.sqlContext.setConf("hive.exec.dynamic.partition;", "true");
// 非严格模式
ss.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
// 设置关闭分桶与排序操作,否则写入hive会报错:
// ... is bucketed but Spark currently does
// NOT populate bucketed output which is compatible with Hive
ss.sqlContext.setConf("hive.enforce.bucketing","false")
ss.sqlContext.setConf("hive.enforce.sorting","false")
log.info("获取SparkSession:" + ss)
问题:SparkSQL在HDFS生成多个文件
解决方式:使用groupby + 设置reduce数量
spark控制小文件的原理和hive一致,执行group by
或distribute by
强制触发reduce操作,达到控制小文件的目的。
insert into table TABLE_NAME
select ...... group by col;
insert into table TABLE_NAME
select ...... distribute by rand();
设置reduce数量的语法和hive稍有不同
set spark.sql.shuffle.partitions=X;
如果reducer的数量设置的过小,而数据本身较大的情况下,可能会出现数量大量流入少量的几个节点,导致程序在最后的环节运行缓慢,因此也不是X
设置的越小越好,需要视具体情况权衡选择。
2021-12-13
问题:IDEA无法运行Spark程序
报错内容:
scalac: Token not found: C:\Users\xjr76\AppData\Local\JetBrains\IdeaIC2021.1\compile-server\tokens\3200
原因未知(可能是太久没运行了),解决办法:plugin里面,禁用Scala,然后启用,根据提示重启IDEA。