- 导入的包 ```python from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.functions import * from pyspark.sql.types import ArrayType, StringType,IntegerType,DoubleType, FloatType
import ast
2. 常用代码
- 连接方式.
EH_SASL 是最重要的东西,不可以少!
```python
EH_SASL ="""
org.apache.kafka.common.security.plain.PlainLoginModule required
username=""
password="";
""" # 本来是写成一行的,方便看才分行写。没有实验过分行写会不会有问题。写成一行的时候,用
# 空格隔开
# 订阅一个topic,默认从topic最早的offset到最近的offset
kafka_url =spark\
.readStream\
.format("kafka")\
.option("kafka.sasl.mechanism", "PLAIN")\
.option("kafka.security.protocol", "SASL_PLAINTEXT")\
.option("kafka.sasl.jaas.config", EH_SASL)\
.option("kafka.bootstrap.servers", "ip:port列表,逗号分隔")\
.option("subscribe", "phishing_url")\
.load()
# 订阅多个topic,并指定每个topic的订阅范围
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
# 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
.option("subscribePattern", "topic.*") \
kafka返回的数据形式
+——+——————————+———————+————-+——————-+————————————+——————————+
| key| value | topic |partition| offset | timestamp |timestampType|
+——+——————————+———————+————-+———————+————————————+—————————-+
|null|byteoffset5505…|test_topic| 6|28424458|2019-12-24 15:04:…| 0|
+——+——————————+———————+————-+———————+————————————+—————————-+
字段选择(数据类型转换)
# selectExpr()函数,类似sql的select和 as
kafka_url = kafka_url.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType")
数据结构转换
"""
kafka返回的数据都是字符串,需要将原有数据类型(如list、dict)转换回来list、dict。
转换函数的选择,只建议使用ast.literal_eval(),其他的转换函数如eval()存在漏洞风险。
"""
@udf(returnType=StringType())
def parseValue(col_value):
url = ast.literal_eval(col_value)["url"]
print(url)
return url
kafka_url_query=kafka_url.writeStream.outputMode(“append”).format(“console”).start()
kafka_url_query.awaitTermination(60)
kafka_url_query.stop()
- 数据结构
每一行都遵循下列模式:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |