依赖包
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.3.0</version></dependency>
代码
import redisfrom redis.client import StrictRedisfrom pyspark.sql import SparkSessionfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsfrom pyspark.streaming.kafka import OffsetRange, TopicAndPartitionfrom pyspark import RDDimport jsonimport timeredis_config = {"host": "localhost","port": 6379,"db": 3,"password": "123456","redis_offset_key": "test_offset4","redis_stop_key": "stop_test"}kafka_config = {"topic": ['my_topic'],"kafka_params": {"bootstrap.servers": "localhost:9092","group.id": "wsy3_group_id","auto.offset.reset": "smallest"}}def get_redis_client(redis_config: dict) -> StrictRedis:pool = redis.ConnectionPool(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"],password=redis_config["password"])redis_client = redis.StrictRedis(connection_pool=pool)return redis_clientdef get_redis_fromOffsets(redis_client: StrictRedis) -> dict:redis_fromOffsets = Noneredis_offset_key = redis_config['redis_offset_key']existing_offset = redis_client.get(redis_offset_key)if existing_offset:print("-----------------从redis启动-----------------------")# 一个redis中存储的数据样例# [{"topic": "topic", "partition": 0, "fromOffset": 7953207, "untilOffset": 7953226},# {"topic": "topic", "partition": 1, "fromOffset": 7951255, "untilOffset": 7951274}]topic_partition_offset_list = json.loads(existing_offset.decode())topicAndPartition_offset_dict = {}for x in topic_partition_offset_list:print("***************redis offset------------------")print(x)print("***************redis offset------------------")topicAndPartition = TopicAndPartition(x['topic'], x['partition'])offset = x['fromOffset']topicAndPartition_offset_dict.update({topicAndPartition: offset})redis_fromOffsets = topicAndPartition_offset_dictelse:print("--------------直接启动----------------------------")return redis_fromOffsetsdef gracefull_stop(ssc: StreamingContext, redis_client: StrictRedis, redis_stop_key: str) -> None:is_stop = Falsewhile not is_stop:print("------------------等待stop------------------------")is_stop = ssc.awaitTerminationOrTimeout(30)if not is_stop and redis_client.exists(redis_stop_key) and "stop" == redis_client.get(redis_stop_key).decode():print("-----------10秒后开始stop-------------------")time.sleep(10)ssc.stop(True, True)def main():spark = SparkSession.builder \.master('local') \.appName('write_offset_to_redis') \.getOrCreate()spark.sparkContext.setLogLevel('INFO')ssc = StreamingContext(spark.sparkContext, 5)redis_client = get_redis_client(redis_config)redis_fromOffsets = get_redis_fromOffsets(redis_client)directKafkaStream = KafkaUtils.createDirectStream(ssc, kafka_config['topic'], kafka_config['kafka_params'],fromOffsets=redis_fromOffsets)def storeOffsetRanges(rdd: RDD) -> RDD:offsetRanges = rdd.offsetRanges()offset_redis = []for o in offsetRanges:offset = {"topic": o.topic,"partition": o.partition,"fromOffset": o.fromOffset,"untilOffset": o.untilOffset}offset_redis.append(offset)print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))redis_offset_value = json.dumps(offset_redis)redis_offset_key = redis_config['redis_offset_key']redis_client.set(redis_offset_key, redis_offset_value)return rdddef printOffsetRanges(rdd: RDD) -> None:passdirectKafkaStream \.transform(storeOffsetRanges) \.foreachRDD(printOffsetRanges)ssc.start()gracefull_stop(ssc, redis_client, redis_config['redis_stop_key'])ssc.awaitTermination()if __name__ == '__main__':main()
启动shell脚本
spark2-submit \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 \--master local \/my/dir/h.py
建议
这个版本的太老,不建议使用
强烈推荐使用Structured Streaming,工程代码如下
Spark redis 管理 kafka offset 和优雅停止
