Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。可以使用Dataset/DataFrame API 来表示 streaming aggregations (流聚合), event-time windows (事件时间窗口), stream-to-batch joins (流到批处理连接) 等。

接收kafka消息

  1. val spark = SparkSession
  2. .builder
  3. .master("local[2]")
  4. .appName("StructuredNetworkWordCount")
  5. .getOrCreate()
  6. val df = spark
  7. .readStream
  8. .format("kafka")
  9. .option("kafka.bootstrap.servers", "10.57.31.159:9092,192.168.6.99:9092")
  10. .option("subscribe", "test")
  11. .option("startingOffsets", "earliest")
  12. .load()
  13. df.writeStream
  14. .option("checkpointLocation", "hdfs://ns1/tmp/kafka_offset")
  15. .trigger(Trigger.ProcessingTime(3000))
  16. .foreachBatch((ds, t) => {
  17. println(t)
  18. ds.foreach(one => {
  19. one.schema.printTreeString()
  20. println(new String(one.getAs("key"), "UTF-8"))
  21. println(new String(one.getAs("value"), "UTF-8"))
  22. println(one.getAs("topic"))
  23. println(one.getAs("partition"))
  24. println(one.getAs("offset"))
  25. println(one.getAs("timestamp"))
  26. println(one.getAs("timestampType"))
  27. })
  28. }).start().awaitTermination()

schema:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

参数设置:

参数 描述
kafka.bootstrap.servers kafka集群
subscribe 订阅topic,可填写多个
subscribePattern 根据pattern匹配订阅topic
startingOffsets 起始offset, earliest,lastest
endingOffsets 结束offset, lastest (仅适用于批处理)
kafkaConsumer.pollTimeoutMs 执行程序中从Kafka轮询数据的超时时间(以毫秒为单位)。
fetchOffset.numRetries 放弃获取Kafka偏移前重试的次数。
fetchOffset.retryIntervalMs 重试获取Kafka偏移之前要等待的毫秒数
maxOffsetsPerTrigger 每个触发间隔处理的最大偏移量的速率限制。指定的偏移总数将在不同卷的topicPartitions中按比例分配。
minPartitions 需要从Kafka读取的最小分区数

实战经验

  1. 若流处理过程中抛出了异常,那么整个流处理会被中断。
  2. 不允许有多个任务并行使用同一个checkpoint文件
  3. 只有设置checkpoint后它才会从上次中断消费的offset继续消费,否则就会用startingOffsets策略
  4. offset默认情况下会保存100来个文件,每次启动会找到最新的offset文件,所以通过修改最新的offset文件里的数据,你可以重新消费

[structured-streaming] 对接kafka - 图1

offset文件大致内容

v1
{"batchWatermarkMs":0,"batchTimestampMs":1586851540003,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test":{"2":10493,"1":10249,"3":10582,"0":10490}}
  1. 如果待消费的offset偏移已经小于对应partition的最小offset,那么任务会被中断

参考链接:

http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html