优雅关闭

  1. package com.atguigu.bigdata.spark.streaming
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.DStream
  4. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
  5. object SparkStreaming08_Close {
  6. def main(args: Array[String]): Unit = {
  7. /*
  8. 线程的关闭:
  9. val thread = new Thread()
  10. thread.start()
  11. thread.stop(); // 强制关闭
  12. */
  13. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  14. val ssc = new StreamingContext(sparkConf, Seconds(3))
  15. val lines = ssc.socketTextStream("localhost", 9999)
  16. val wordToOne = lines.map((_,1))
  17. wordToOne.print()
  18. ssc.start()
  19. // 如果想要关闭采集器,那么需要创建新的线程
  20. // 而且需要在第三方程序中增加关闭状态
  21. new Thread(
  22. new Runnable {
  23. override def run(): Unit = {
  24. // 优雅地关闭
  25. // 计算节点不在接收新的数据,而是将现有的数据处理完毕,然后关闭
  26. // Mysql : Table(stopSpark) => Row => data
  27. // Redis : Data(K-V)
  28. // ZK : /stopSpark
  29. // HDFS : /stopSpark
  30. /*
  31. while ( true ) {
  32. if (true) {
  33. // 获取SparkStreaming状态
  34. val state: StreamingContextState = ssc.getState()
  35. if ( state == StreamingContextState.ACTIVE ) {
  36. ssc.stop(true, true)
  37. }
  38. }
  39. Thread.sleep(5000)
  40. }
  41. */
  42. Thread.sleep(5000)
  43. val state: StreamingContextState = ssc.getState()
  44. if ( state == StreamingContextState.ACTIVE ) {
  45. ssc.stop(true, true)
  46. }
  47. System.exit(0)
  48. }
  49. }
  50. ).start()
  51. ssc.awaitTermination() // block 阻塞main线程
  52. }
  53. }

优雅恢复

  1. package com.atguigu.bigdata.spark.streaming
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
  4. object SparkStreaming09_Resume {
  5. def main(args: Array[String]): Unit = {
  6. val ssc = StreamingContext.getActiveOrCreate("cp", ()=>{
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  8. val ssc = new StreamingContext(sparkConf, Seconds(3))
  9. val lines = ssc.socketTextStream("localhost", 9999)
  10. val wordToOne = lines.map((_,1))
  11. wordToOne.print()
  12. ssc
  13. })
  14. ssc.checkpoint("cp")
  15. ssc.start()
  16. ssc.awaitTermination() // block 阻塞main线程
  17. }
  18. }