Kafka
Receiver和Direct两种模式
spark-streaming-kafka-0-10中已经淘汰Receiver模式
而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
SparkStreaming save HBase
https://blog.csdn.net/xianpanjia4616/article/details/85301998
SparkStreaming Select HBase
https://blog.csdn.net/LW_GHY/article/details/51673081
报错解决
object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord
set序列化即可代码如下
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
checkpoint
https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81417061
注意
分配的核数一定要大于Receiver数,要不然接受到数据没有线程去处理它。
坑收集
Q3: streaming checkpoint都有哪些坑?
A3:
从A2中可以看到,应用定义的计算函数也被序列化到checkpoint目录,当应用代码发生改变时,此时就没法从checkpoint恢复。个人感觉这是checkpoint在生产环境使用中碰到的最大障碍。
另外,当从checkpoint目录恢复streamingContext时,配置信息啥的也都是从checkpoint读取的(只有很少的一部分配置是reload的,具体见读流程),当重启任务时,新改变的配置就可能不生效,导致很奇怪的问题。
反压
Kafka DirectApproach(直连)
spark.streaming.backpressure.enabled=true
spark.streaming.kafka.maxRatePerPartition=8400
例子
#!/bin/sh
TaskName="funnel"
UserName="hadoop"
cd `dirname $0`
nohup sudo -u ${UserName} /data/bigdata/spark/bin/spark-submit \
--name ${TaskName} \
--class FunnelMain \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 3 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.backpressure.initialRate=1000 \
--files /data/apps/funnel/app/conf/conf.properties \
/data/apps/funnel/app/target/apphadoop-1-jar-with-dependencies.jar conf.properties >>../log/${TaskName}.log 2>&1 &
exit 0
Kafka Offset管理
不用CheckPoint
如果Streaming程序的代码改变了,重新打包执行就会出现反序列化异常的问题。这是因为checkpoint首次持久化时会将整个jar包序列化,以便重启时恢复。重新打包之后,新旧代码逻辑不同,就会报错或者仍然执行旧版代码。
要解决这个问题,只能将HDFS上的checkpoint文件删掉,但这样也会同时删掉Kafka的offset信息,就毫无意义了。
最佳实践
mapWithState方法能够得到6倍的低延迟的同时维护的key状态的数量要多10倍
参考
https://www.jianshu.com/p/d2a61be73513
SparkStreaming 不能直接append到HDFS中。
Hive on HBase比Hive on HDFS要差
https://blog.csdn.net/bluejoe2000/article/details/47815907