1. 导入的包 ```python from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.functions import * from pyspark.sql.types import ArrayType, StringType,IntegerType,DoubleType, FloatType

    import ast

    1. 2. 常用代码
    2. - 连接方式.
    3. EH_SASL 是最重要的东西,不可以少!
    4. ```python
    5. EH_SASL ="""
    6. org.apache.kafka.common.security.plain.PlainLoginModule required
    7. username=""
    8. password="";
    9. """ # 本来是写成一行的,方便看才分行写。没有实验过分行写会不会有问题。写成一行的时候,用
    10. # 空格隔开
    11. # 订阅一个topic,默认从topic最早的offset到最近的offset
    12. kafka_url =spark\
    13. .readStream\
    14. .format("kafka")\
    15. .option("kafka.sasl.mechanism", "PLAIN")\
    16. .option("kafka.security.protocol", "SASL_PLAINTEXT")\
    17. .option("kafka.sasl.jaas.config", EH_SASL)\
    18. .option("kafka.bootstrap.servers", "ip:port列表,逗号分隔")\
    19. .option("subscribe", "phishing_url")\
    20. .load()
    21. # 订阅多个topic,并指定每个topic的订阅范围
    22. .option("subscribe", "topic1,topic2") \
    23. .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
    24. .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
    25. # 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
    26. .option("subscribePattern", "topic.*") \

    kafka返回的数据形式
    +——+——————————+———————+————-+——————-+————————————+——————————+
    | key| value | topic |partition| offset | timestamp |timestampType|
    +——+——————————+———————+————-+———————+————————————+—————————-+
    |null|byteoffset5505…|test_topic| 6|28424458|2019-12-24 15:04:…| 0|
    +——+——————————+———————+————-+———————+————————————+—————————-+

    • 字段选择(数据类型转换)

      1. # selectExpr()函数,类似sql的select和 as
      2. kafka_url = kafka_url.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType")
    • 数据结构转换

      1. """
      2. kafka返回的数据都是字符串,需要将原有数据类型(如list、dict)转换回来list、dict。
      3. 转换函数的选择,只建议使用ast.literal_eval(),其他的转换函数如eval()存在漏洞风险。
      4. """
      5. @udf(returnType=StringType())
      6. def parseValue(col_value):
      7. url = ast.literal_eval(col_value)["url"]
      8. print(url)
      9. return url

    kafka_url_query=kafka_url.writeStream.outputMode(“append”).format(“console”).start()
    kafka_url_query.awaitTermination(60)
    kafka_url_query.stop()

    • 数据结构

    每一行都遵循下列模式:

    Column Type
    key binary
    value binary
    topic string
    partition int
    offset long
    timestamp long
    timestampType int