https://stackoverflow.com/questions/58766638/how-to-use-foreach-or-foreachbatch-in-pyspark-to-write-to-database
    尚未试验过,走读觉得应该可以
    注:foreach和foreachBatch是用于处理流数据的

    1. from pyspark.sql import SparkSession
    2. import pyspark.sql.functions as F
    3. from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
    4. from pyspark.sql import DataFrameWriter
    5. # configuration of target db
    6. db_target_url = "jdbc:mysql://localhost/database"
    7. db_target_properties = {"user":"writer", "password":"1234"}
    8. # schema
    9. schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])
    10. # create spark session
    11. spark = SparkSession.builder.appName("streamer").getOrCreate()
    12. # create DataFrame representing the stream
    13. df = spark.readStream \
    14. .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
    15. .option("subscribe", "mytopic") \
    16. .load() \
    17. .selectExpr("Timestamp", "cast (value as string) as json") \
    18. .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
    19. .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
    20. df.printSchema()
    21. # Do some dummy processing
    22. df2 = df.filter("Value < 11111111111")
    23. print("df2: ", df2.isStreaming)
    24. def process_row(row):
    25. # Process row
    26. row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    27. pass
    28. query = df2.writeStream.foreachBatch(process_row).start()