网上有很多scala和java链接kafka的教程,唯独pyspark链接kafka的教程非常非常少,少到看kafka官方文档都只是粗略介绍正常情况下的链接方式。摸索和google了几天,终于找到了pyspark连接kafka的方式,特地记录下:

  1. 正常情况下,python可以通过安装kafka-python来直接kafka,如: ```python

    pip install kafka-python

from kafka import KafkaConsumer urlKafkaConsumer = KafkaConsumer(‘ topicName’, group_id=’goldfish_svr_dev’, bootstrap_servers=’192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092’, session_timeout_ms=30 * 1000, fetch_max_bytes=8192, security_protocol=’SASL_PLAINTEXT’, # 加密协议 sasl_mechanism=’PLAIN’, sasl_plain_username=’xxx’, sasl_plain_password=’xxx’)

print(“—————\n getMsg()”) try: if urlKafkaConsumer is None: print(“urlKafkaConsumer is None”) else: print(“urlKafkaConsumer is “,urlKafkaConsumer) for msg in urlKafkaConsumer: print(msg)

  1. # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))

except KeyboardInterrupt as e: print(e)

  1. 2. 希望用到streaming的方式,就曲折一点。pyspark中首先要下载jar包【建议看完以下所有记录,再动手操作】
  2. 网上一般采用 spark-streaming-kafka-0.8_2.11_2.10.jar包,注意版本号!!!看引用。<br />但由于版本问题,我不使用spark-streaming-kafka 这个包,使用的是spark-sql-kafka-0-10_2.11-2.4.0.jar,配上 --jars 参数一起使用。--jars spark-sql-kafka-0-10_2.11-2.4.0.jar
  3. > 首先要明确三个版本号
  4. > scala
  5. > spark
  6. > kafka
  7. > 然后到[https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11/2.4.0](https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11/2.4.0) 下载对应版本.jar包
  8. > 三个版本号的关系是:
  9. > spark-streaming-kafka-0-8_2.11-2.1.0.jar文件的下载,其中,2.11表示scala的版本,2.1.0表示Spark版本号。。
  10. > 本教程安装的Spark版本号是2.1.0scala版本号是2.11,所以,一定要选择Kafka版本号是2.11开头的。比如,到Kafka官网中,可以下载安装文件Kafka_2.11-0.10.2.0.tgz,前面的2.11就是支持的scala版本号,后面的0.10.2.0Kafka自身的版本号。
  11. > 注意,spark2.3开始不支持spark-streaming-kafka-0-8 版本, spark-streaming-kafka-0-10版本不支持python。。。艹
  12. >
  13. > spark-sql-kafka-0-10_2.11-2.4.0.jar 这个包可以到 [https://search.maven.org/search](https://search.maven.org/search) 下载,查询语法和结果如下,语法自己悟吧。这个查询语句是:g:org.apache.spark AND v:2.4.0
  14. > ![image.png](https://cdn.nlark.com/yuque/0/2019/png/456650/1574487969439-3fa53e1f-772f-4c37-a3fd-206b81efb8d7.png#align=left&display=inline&height=171&name=image.png&originHeight=171&originWidth=1464&size=18043&status=done&width=1464)
  15. 提交了spark-sql-kafka-0-10_2.11-2.4.0.jar这个包,接下来就要连接kafka了,这里使用的流是:Struct streaming,毕竟另一种streaming实在是找不到连接方式。
  16. ```python
  17. # 下面这个变量是亮点,也是隐藏最深的东西,没了它,死活连不死。找了好几天。。。
  18. EH_SASL = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";'
  19. kafka_url =spark\
  20. .readStream\
  21. .format("kafka")\
  22. .option("kafka.sasl.mechanism", "PLAIN")\
  23. .option("kafka.security.protocol", "SASL_PLAINTEXT")\
  24. .option("kafka.sasl.jaas.config", EH_SASL)\
  25. .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092")\
  26. .option("subscribe", "phishing_url")\
  27. .load()
  28. kafka_url = kafka_url.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType")