背景
Spark Streaming 程序,一般情况下是不会人为关闭的,但是有些时候需要停服修改逻辑(下线调整)。假设我们使用Yarn 来管理提交的任务,直接使用 yarn application -kill taskId 的话,显然非常暴力。可能会造成数据的丢失或重复计算(假设从 Kafka 消费数据,还没有计算完,这时候被 kill 掉了,下次程序启动之后可能会出现重复计算或数据丢失的情况。)
设计的程序的时候,我们需要尽可能的避免使用这种暴力的方式。
那该怎么办呢?带着这个疑问,我们先来看一个参数
spark.streaming.stopGracefullyOnShutdown,如果为true,Spark将在JVM关闭时优雅地关闭StreamingContext,而不是立即关闭。参数来源
第一种:发送信号到 Driver
(1)首先需要在程序中设置 spark.streaming.stopGracefullyOnShutdown 为 true 。
(2)启动程序一段时间后,如果想要停止了,这时候需要去 Spark UI 界面上找到这个任务对应的 Driver 在哪个节点启动的。
【注】:一般我是这样启动的: _ nohup _spark-submit \ —master yarn \ —executor-cores 1 \ —executor-memory 3g \ —num-executors 6 \ —driver-memory 2g \
--class com.xx.xxx.xxx \
--jars ${submitJars} \
--queue xxxx \
--conf "spark.driver.host=xxx.xx.xx.xx" \
--driver-class-path ${rootDir}/conf \
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" \
--properties-file ${rootDir}/conf/spark.properties \
${rootDir}/lib/xxxx.jar _> ${rootDir}/cache/xxxxxx.txt 2>&1 _&
所以我提交任务的机器就是 Driver 节点。
(3)找到 Driver 进程的 pid, 执行 ps -ef | grep java | grep xxx,注意 xxx 一般是你提交的 Spark 应用的名字或 jar 包的名字。
(4)执行:kill -SIGTERM
当 Driver 进程收到 SIGTERM 信号之后,一般会打印下面的日志:
17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
...
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook
...
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext
...
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called
不过,有一个陷阱。默认情况下,spark.yarn.maxAppAttempts参数使用来自 yarn.resourcemanager.am.max taints in yarn的默认值。默认值为2。因此,在kill命令停止第一个 AM 后,YARN将自动启动另一个AM/驱动程序。你必须再杀第二个。您可以在spark提交过程中设置—conf spark.yarn.maxAppAttempts=1,但是这个参数如果设置成 1 ,就会出现 AM 是失败不重试的风险,容灾效果就变差了,谨慎选择这种方案。
所以上面这种方案,哔哔不推荐。
第二种:获取第三方系统做消息通知
思路很简单,就是定时的获取第三方系统(我这里用的是 Redis)的标识,如果需要停止,调用StreamContext对象stop方法,自己优雅的终止自己。
核心代码:第 15 行代码以及第 17 行代码。
ssc.start()
// 设置 redis 配置,第三方系统;
val redisConf = Map("host" -> PropertiesUtils.get(s"$appName.redis.host"),
"port" -> PropertiesUtils.get(s"$appName.redis.port"),
"auth" -> PropertiesUtils.get(s"$appName.redis.auth"),
"db" -> PropertiesUtils.get(s"$appName.redis.db")
)
// 下面是定时器:优雅的关掉流逝程序
val batchDuration = PropertiesUtils.get(s"$appName.batchDuration").toLong * 1000L
val timer = new java.util.Timer()
val task = new java.util.TimerTask {
override def run(): Unit = {
val redisDB = RedisDB.getInstance(redisConf)
// 定时判断,该应用是否需要关闭
if (redisDB.get(s"spark#streaming#$appName") == "shutdown") {
// 优雅的终止Spark-Streaming,保证数据不会丢失
ssc.stop(stopSparkContext = true, stopGracefully = true)
// 直接终止定时器,可能会导致数据丢失
Thread.sleep(batchDuration)
// 终止定时器
cancel()
timer.cancel()
}
redisDB.close()
}
}
// delay:用户调用 schedule() 方法后,要等待这么长的时间才可以第一次执行run() 方法
// period:第一次调用之后,从第二次开始每隔多长的时间调用一次 run() 方法
timer.schedule(task, batchDuration, batchDuration)
ssc.awaitTermination()
触发方式很简单,只需要把 redis 的 key 设置成 shutdown 即可。