Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。可以使用Dataset/DataFrame API 来表示 streaming aggregations (流聚合), event-time windows (事件时间窗口), stream-to-batch joins (流到批处理连接) 等。
接收kafka消息
val spark = SparkSession
.builder
.master("local[2]")
.appName("StructuredNetworkWordCount")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.57.31.159:9092,192.168.6.99:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
df.writeStream
.option("checkpointLocation", "hdfs://ns1/tmp/kafka_offset")
.trigger(Trigger.ProcessingTime(3000))
.foreachBatch((ds, t) => {
println(t)
ds.foreach(one => {
one.schema.printTreeString()
println(new String(one.getAs("key"), "UTF-8"))
println(new String(one.getAs("value"), "UTF-8"))
println(one.getAs("topic"))
println(one.getAs("partition"))
println(one.getAs("offset"))
println(one.getAs("timestamp"))
println(one.getAs("timestampType"))
})
}).start().awaitTermination()
schema:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
参数设置:
参数 | 描述 |
---|---|
kafka.bootstrap.servers | kafka集群 |
subscribe | 订阅topic,可填写多个 |
subscribePattern | 根据pattern匹配订阅topic |
startingOffsets | 起始offset, earliest,lastest |
endingOffsets | 结束offset, lastest (仅适用于批处理) |
kafkaConsumer.pollTimeoutMs | 执行程序中从Kafka轮询数据的超时时间(以毫秒为单位)。 |
fetchOffset.numRetries | 放弃获取Kafka偏移前重试的次数。 |
fetchOffset.retryIntervalMs | 重试获取Kafka偏移之前要等待的毫秒数 |
maxOffsetsPerTrigger | 每个触发间隔处理的最大偏移量的速率限制。指定的偏移总数将在不同卷的topicPartitions中按比例分配。 |
minPartitions | 需要从Kafka读取的最小分区数 |
实战经验
- 若流处理过程中抛出了异常,那么整个流处理会被中断。
- 不允许有多个任务并行使用同一个checkpoint文件
- 只有设置checkpoint后它才会从上次中断消费的offset继续消费,否则就会用startingOffsets策略
- offset默认情况下会保存100来个文件,每次启动会找到最新的offset文件,所以通过修改最新的offset文件里的数据,你可以重新消费
offset文件大致内容
v1
{"batchWatermarkMs":0,"batchTimestampMs":1586851540003,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test":{"2":10493,"1":10249,"3":10582,"0":10490}}
- 如果待消费的offset偏移已经小于对应partition的最小offset,那么任务会被中断
参考链接:
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html