背景

Spark Streaming 程序,一般情况下是不会人为关闭的,但是有些时候需要停服修改逻辑(下线调整)。假设我们使用Yarn 来管理提交的任务,直接使用 yarn application -kill taskId 的话,显然非常暴力。可能会造成数据的丢失或重复计算(假设从 Kafka 消费数据,还没有计算完,这时候被 kill 掉了,下次程序启动之后可能会出现重复计算或数据丢失的情况。)

设计的程序的时候,我们需要尽可能的避免使用这种暴力的方式。

那该怎么办呢?带着这个疑问,我们先来看一个参数

spark.streaming.stopGracefullyOnShutdown,如果为true,Spark将在JVM关闭时优雅地关闭StreamingContext,而不是立即关闭。参数来源

第一种:发送信号到 Driver

(1)首先需要在程序中设置 spark.streaming.stopGracefullyOnShutdown 为 true 。

(2)启动程序一段时间后,如果想要停止了,这时候需要去 Spark UI 界面上找到这个任务对应的 Driver 在哪个节点启动的。

image.png
image.png

【注】:一般我是这样启动的: _ nohup _spark-submit \ —master yarn \ —executor-cores 1 \ —executor-memory 3g \ —num-executors 6 \ —driver-memory 2g \

  1. --class com.xx.xxx.xxx \
  2. --jars ${submitJars} \
  3. --queue xxxx \
  4. --conf "spark.driver.host=xxx.xx.xx.xx" \
  5. --driver-class-path ${rootDir}/conf \
  6. --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" \
  7. --properties-file ${rootDir}/conf/spark.properties \
  8. ${rootDir}/lib/xxxx.jar _> ${rootDir}/cache/xxxxxx.txt 2>&1 _&

所以我提交任务的机器就是 Driver 节点。

(3)找到 Driver 进程的 pid, 执行 ps -ef | grep java | grep xxx,注意 xxx 一般是你提交的 Spark 应用的名字或 jar 包的名字。

(4)执行:kill -SIGTERM

当 Driver 进程收到 SIGTERM 信号之后,一般会打印下面的日志:

  1. 17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
  2. 17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
  3. ...
  4. 17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully
  5. 17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook
  6. ...
  7. 17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext
  8. ...
  9. 17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called

不过,有一个陷阱。默认情况下,spark.yarn.maxAppAttempts参数使用来自 yarn.resourcemanager.am.max taints in yarn的默认值。默认值为2。因此,在kill命令停止第一个 AM 后,YARN将自动启动另一个AM/驱动程序。你必须再杀第二个。您可以在spark提交过程中设置—conf spark.yarn.maxAppAttempts=1,但是这个参数如果设置成 1 ,就会出现 AM 是失败不重试的风险,容灾效果就变差了,谨慎选择这种方案。

所以上面这种方案,哔哔不推荐。

第二种:获取第三方系统做消息通知

思路很简单,就是定时的获取第三方系统(我这里用的是 Redis)的标识,如果需要停止,调用StreamContext对象stop方法,自己优雅的终止自己。

核心代码:第 15 行代码以及第 17 行代码。

  1. ssc.start()
  2. // 设置 redis 配置,第三方系统;
  3. val redisConf = Map("host" -> PropertiesUtils.get(s"$appName.redis.host"),
  4. "port" -> PropertiesUtils.get(s"$appName.redis.port"),
  5. "auth" -> PropertiesUtils.get(s"$appName.redis.auth"),
  6. "db" -> PropertiesUtils.get(s"$appName.redis.db")
  7. )
  8. // 下面是定时器:优雅的关掉流逝程序
  9. val batchDuration = PropertiesUtils.get(s"$appName.batchDuration").toLong * 1000L
  10. val timer = new java.util.Timer()
  11. val task = new java.util.TimerTask {
  12. override def run(): Unit = {
  13. val redisDB = RedisDB.getInstance(redisConf)
  14. // 定时判断,该应用是否需要关闭
  15. if (redisDB.get(s"spark#streaming#$appName") == "shutdown") {
  16. // 优雅的终止Spark-Streaming,保证数据不会丢失
  17. ssc.stop(stopSparkContext = true, stopGracefully = true)
  18. // 直接终止定时器,可能会导致数据丢失
  19. Thread.sleep(batchDuration)
  20. // 终止定时器
  21. cancel()
  22. timer.cancel()
  23. }
  24. redisDB.close()
  25. }
  26. }
  27. // delay:用户调用 schedule() 方法后,要等待这么长的时间才可以第一次执行run() 方法
  28. // period:第一次调用之后,从第二次开始每隔多长的时间调用一次 run() 方法
  29. timer.schedule(task, batchDuration, batchDuration)
  30. ssc.awaitTermination()


触发方式很简单,只需要把 redis 的 key 设置成 shutdown 即可。

参考文献