依赖包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.0</version>
</dependency>
代码
import redis
from redis.client import StrictRedis
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import OffsetRange, TopicAndPartition
from pyspark import RDD
import json
import time
redis_config = {
"host": "localhost",
"port": 6379,
"db": 3,
"password": "123456",
"redis_offset_key": "test_offset4",
"redis_stop_key": "stop_test"
}
kafka_config = {
"topic": ['my_topic'],
"kafka_params": {
"bootstrap.servers": "localhost:9092",
"group.id": "wsy3_group_id",
"auto.offset.reset": "smallest"
}
}
def get_redis_client(redis_config: dict) -> StrictRedis:
pool = redis.ConnectionPool(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"],
password=redis_config["password"])
redis_client = redis.StrictRedis(connection_pool=pool)
return redis_client
def get_redis_fromOffsets(redis_client: StrictRedis) -> dict:
redis_fromOffsets = None
redis_offset_key = redis_config['redis_offset_key']
existing_offset = redis_client.get(redis_offset_key)
if existing_offset:
print("-----------------从redis启动-----------------------")
# 一个redis中存储的数据样例
# [{"topic": "topic", "partition": 0, "fromOffset": 7953207, "untilOffset": 7953226},
# {"topic": "topic", "partition": 1, "fromOffset": 7951255, "untilOffset": 7951274}]
topic_partition_offset_list = json.loads(existing_offset.decode())
topicAndPartition_offset_dict = {}
for x in topic_partition_offset_list:
print("***************redis offset------------------")
print(x)
print("***************redis offset------------------")
topicAndPartition = TopicAndPartition(x['topic'], x['partition'])
offset = x['fromOffset']
topicAndPartition_offset_dict.update({
topicAndPartition: offset
})
redis_fromOffsets = topicAndPartition_offset_dict
else:
print("--------------直接启动----------------------------")
return redis_fromOffsets
def gracefull_stop(ssc: StreamingContext, redis_client: StrictRedis, redis_stop_key: str) -> None:
is_stop = False
while not is_stop:
print("------------------等待stop------------------------")
is_stop = ssc.awaitTerminationOrTimeout(30)
if not is_stop and redis_client.exists(redis_stop_key) and "stop" == redis_client.get(redis_stop_key).decode():
print("-----------10秒后开始stop-------------------")
time.sleep(10)
ssc.stop(True, True)
def main():
spark = SparkSession.builder \
.master('local') \
.appName('write_offset_to_redis') \
.getOrCreate()
spark.sparkContext.setLogLevel('INFO')
ssc = StreamingContext(spark.sparkContext, 5)
redis_client = get_redis_client(redis_config)
redis_fromOffsets = get_redis_fromOffsets(redis_client)
directKafkaStream = KafkaUtils.createDirectStream(ssc, kafka_config['topic'], kafka_config['kafka_params'],
fromOffsets=redis_fromOffsets)
def storeOffsetRanges(rdd: RDD) -> RDD:
offsetRanges = rdd.offsetRanges()
offset_redis = []
for o in offsetRanges:
offset = {
"topic": o.topic,
"partition": o.partition,
"fromOffset": o.fromOffset,
"untilOffset": o.untilOffset
}
offset_redis.append(offset)
print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
redis_offset_value = json.dumps(offset_redis)
redis_offset_key = redis_config['redis_offset_key']
redis_client.set(redis_offset_key, redis_offset_value)
return rdd
def printOffsetRanges(rdd: RDD) -> None:
pass
directKafkaStream \
.transform(storeOffsetRanges) \
.foreachRDD(printOffsetRanges)
ssc.start()
gracefull_stop(ssc, redis_client, redis_config['redis_stop_key'])
ssc.awaitTermination()
if __name__ == '__main__':
main()
启动shell脚本
spark2-submit \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 \
--master local \
/my/dir/h.py
建议
这个版本的太老,不建议使用
强烈推荐使用Structured Streaming,工程代码如下
Spark redis 管理 kafka offset 和优雅停止