- 导入的包 ```python from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.functions import * from pyspark.sql.types import ArrayType, StringType,IntegerType,DoubleType, FloatType
import ast
2. 常用代码- 连接方式.EH_SASL 是最重要的东西,不可以少!```pythonEH_SASL ="""org.apache.kafka.common.security.plain.PlainLoginModule requiredusername=""password="";""" # 本来是写成一行的,方便看才分行写。没有实验过分行写会不会有问题。写成一行的时候,用# 空格隔开# 订阅一个topic,默认从topic最早的offset到最近的offsetkafka_url =spark\.readStream\.format("kafka")\.option("kafka.sasl.mechanism", "PLAIN")\.option("kafka.security.protocol", "SASL_PLAINTEXT")\.option("kafka.sasl.jaas.config", EH_SASL)\.option("kafka.bootstrap.servers", "ip:port列表,逗号分隔")\.option("subscribe", "phishing_url")\.load()# 订阅多个topic,并指定每个topic的订阅范围.option("subscribe", "topic1,topic2") \.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \# 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset.option("subscribePattern", "topic.*") \
kafka返回的数据形式
+——+——————————+———————+————-+——————-+————————————+——————————+
| key| value | topic |partition| offset | timestamp |timestampType|
+——+——————————+———————+————-+———————+————————————+—————————-+
|null|byteoffset5505…|test_topic| 6|28424458|2019-12-24 15:04:…| 0|
+——+——————————+———————+————-+———————+————————————+—————————-+
字段选择(数据类型转换)
# selectExpr()函数,类似sql的select和 askafka_url = kafka_url.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType")
数据结构转换
"""kafka返回的数据都是字符串,需要将原有数据类型(如list、dict)转换回来list、dict。转换函数的选择,只建议使用ast.literal_eval(),其他的转换函数如eval()存在漏洞风险。"""@udf(returnType=StringType())def parseValue(col_value):url = ast.literal_eval(col_value)["url"]print(url)return url
kafka_url_query=kafka_url.writeStream.outputMode(“append”).format(“console”).start()
kafka_url_query.awaitTermination(60)
kafka_url_query.stop()
- 数据结构
每一行都遵循下列模式:
| Column | Type |
|---|---|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
