依赖包

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>

代码

  1. import redis
  2. from redis.client import StrictRedis
  3. from pyspark.sql import SparkSession
  4. from pyspark.streaming import StreamingContext
  5. from pyspark.streaming.kafka import KafkaUtils
  6. from pyspark.streaming.kafka import OffsetRange, TopicAndPartition
  7. from pyspark import RDD
  8. import json
  9. import time
  10. redis_config = {
  11. "host": "localhost",
  12. "port": 6379,
  13. "db": 3,
  14. "password": "123456",
  15. "redis_offset_key": "test_offset4",
  16. "redis_stop_key": "stop_test"
  17. }
  18. kafka_config = {
  19. "topic": ['my_topic'],
  20. "kafka_params": {
  21. "bootstrap.servers": "localhost:9092",
  22. "group.id": "wsy3_group_id",
  23. "auto.offset.reset": "smallest"
  24. }
  25. }
  26. def get_redis_client(redis_config: dict) -> StrictRedis:
  27. pool = redis.ConnectionPool(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"],
  28. password=redis_config["password"])
  29. redis_client = redis.StrictRedis(connection_pool=pool)
  30. return redis_client
  31. def get_redis_fromOffsets(redis_client: StrictRedis) -> dict:
  32. redis_fromOffsets = None
  33. redis_offset_key = redis_config['redis_offset_key']
  34. existing_offset = redis_client.get(redis_offset_key)
  35. if existing_offset:
  36. print("-----------------从redis启动-----------------------")
  37. # 一个redis中存储的数据样例
  38. # [{"topic": "topic", "partition": 0, "fromOffset": 7953207, "untilOffset": 7953226},
  39. # {"topic": "topic", "partition": 1, "fromOffset": 7951255, "untilOffset": 7951274}]
  40. topic_partition_offset_list = json.loads(existing_offset.decode())
  41. topicAndPartition_offset_dict = {}
  42. for x in topic_partition_offset_list:
  43. print("***************redis offset------------------")
  44. print(x)
  45. print("***************redis offset------------------")
  46. topicAndPartition = TopicAndPartition(x['topic'], x['partition'])
  47. offset = x['fromOffset']
  48. topicAndPartition_offset_dict.update({
  49. topicAndPartition: offset
  50. })
  51. redis_fromOffsets = topicAndPartition_offset_dict
  52. else:
  53. print("--------------直接启动----------------------------")
  54. return redis_fromOffsets
  55. def gracefull_stop(ssc: StreamingContext, redis_client: StrictRedis, redis_stop_key: str) -> None:
  56. is_stop = False
  57. while not is_stop:
  58. print("------------------等待stop------------------------")
  59. is_stop = ssc.awaitTerminationOrTimeout(30)
  60. if not is_stop and redis_client.exists(redis_stop_key) and "stop" == redis_client.get(redis_stop_key).decode():
  61. print("-----------10秒后开始stop-------------------")
  62. time.sleep(10)
  63. ssc.stop(True, True)
  64. def main():
  65. spark = SparkSession.builder \
  66. .master('local') \
  67. .appName('write_offset_to_redis') \
  68. .getOrCreate()
  69. spark.sparkContext.setLogLevel('INFO')
  70. ssc = StreamingContext(spark.sparkContext, 5)
  71. redis_client = get_redis_client(redis_config)
  72. redis_fromOffsets = get_redis_fromOffsets(redis_client)
  73. directKafkaStream = KafkaUtils.createDirectStream(ssc, kafka_config['topic'], kafka_config['kafka_params'],
  74. fromOffsets=redis_fromOffsets)
  75. def storeOffsetRanges(rdd: RDD) -> RDD:
  76. offsetRanges = rdd.offsetRanges()
  77. offset_redis = []
  78. for o in offsetRanges:
  79. offset = {
  80. "topic": o.topic,
  81. "partition": o.partition,
  82. "fromOffset": o.fromOffset,
  83. "untilOffset": o.untilOffset
  84. }
  85. offset_redis.append(offset)
  86. print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
  87. redis_offset_value = json.dumps(offset_redis)
  88. redis_offset_key = redis_config['redis_offset_key']
  89. redis_client.set(redis_offset_key, redis_offset_value)
  90. return rdd
  91. def printOffsetRanges(rdd: RDD) -> None:
  92. pass
  93. directKafkaStream \
  94. .transform(storeOffsetRanges) \
  95. .foreachRDD(printOffsetRanges)
  96. ssc.start()
  97. gracefull_stop(ssc, redis_client, redis_config['redis_stop_key'])
  98. ssc.awaitTermination()
  99. if __name__ == '__main__':
  100. main()

启动shell脚本

  1. spark2-submit \
  2. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 \
  3. --master local \
  4. /my/dir/h.py

建议

这个版本的太老,不建议使用
强烈推荐使用Structured Streaming,工程代码如下
Spark redis 管理 kafka offset 和优雅停止