更新记录
| 时间 | 更新内容 | 更新人 | 
|---|---|---|
| 2022-10-31 | 文档创建 | 付飞 | 
| 2022-11-01 | 增加默认mainArgsConfig配置类解析 | 付飞 | 
| 2022-11-02 | 1. 配置项必要与非必要参数识别,并增加非必要默认值 2. 增加Jdbc、Print、Kafka三种Sink组件支持 3. 增加通过main参数Flow构建流程定义并启动相应应用 4. 其它细节调整  | 
付飞 | 
| 2022-11-07 | 1. 支持kafka source序列化定义及类型支持 2. 增加shareStreamName属性定义  | 
付飞 | 
| 2022-11-09 | 1. 增加merge…算子优化支持 2. 更新部分算子处理逻辑及部分bug  | 
付飞 | 
| 2022-11-10 | 增加脚手架编排样例(持续更新) | 付飞 | 
| 2022-11-15 | 增加JsonMapper增加算子(算子转换规则将持续更新) | 付飞 | 
| 2022-12-19 | 增加transform、sink原生代码解析支持 | 付飞 | 
| 2023-01-04 | 增加checkpoint是否开启配置 增加kafka source checkpoint 提交 offset 配置  | 
付飞 | 
| 2023-01-16 | 增加任务全局并行度设置 增加本地调试开启webui  | 
付飞 | 
Hello World
hello world案例中,我们将消费kafka数据(sink),并计算每条数据的长度(transform),而后打印至控制台(sink)。
1. 引入脚本架依赖
<dependency><groupId>com.zxelec</groupId><artifactId>flink-scaffold</artifactId><version>1.1</version></dependency>
2. 定义启动类
package com.zxelec.demo;import com.zxelec.demo.process.SizeMap;import com.zxelec.scaffold.annotation.Flow;import com.zxelec.scaffold.annotation.Sink;import com.zxelec.scaffold.annotation.Source;import com.zxelec.scaffold.annotation.Stream;import com.zxelec.scaffold.annotation.Transform;import com.zxelec.scaffold.app.Application;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;@Flow(streams = {@Stream(sources = {@Source(sourceFunction = KafkaSource.class,configPrefix = "demo")},transforms = {@Transform(transformFunction = SizeMap.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)})})public class HelloWorldApplication extends Application {}
SizeMap.java
package com.zxelec.demo.process;import org.apache.flink.api.common.functions.MapFunction;public class SizeMap implements MapFunction<String, Integer> {@Overridepublic Integer map(String json) throws Exception {return json.length();}}
3. 定义配置文件
# 应用名称app.name=HelloWorld# 基础环境配置env.checkpoint.enable=trueenv.checkpoint.interval=30env.checkpoint.directory=file:///Users/felix/Desktop/ckenv.restart.attempts=3env.restart.delay.interval=10# kafkademo.name=car_streamdemo.bootstrap.servers=192.168.1.249:9092demo.topic=car_streamdemo.group.id=demo_group_01demo.auto.offset.reset=earliestdemo.enable.auto.commit=false
4. 启动参数
--MainClass com.zxelec.demo.HelloWorldApplication --ConfigPath ${application.properties文件路径}
完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!
注解标签
@Flow
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig(可选参数) 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下:
@Flow(mainArgsConfig = <? extends MainArgsConfig.class>,streams = {@Stream(),...})
java
/**
 * @author: Felix
 * @description: 解析main args传参
 * @date: 15:45 2022/10/25
 * @version: 1.0
 */
public interface MainArgsConfig {
    /**
     * 解析配置项
     * @param args
     * @return
     */
    HashMap<String, String> process(String mainArgs);
}
备注:此版本脚手架已提供默认mainArgs参数解析类,可自动解析单层kv结构JSON格式数据,将其封闭入配置容器ParameterTool中。使用时无需显示增加 mainArgsConfig = MainArgsConfig.class 类似定义。
{"name": "zhangsan","age": 18}
streams(必需参数)
用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。
@Stream
@Stream用于定义一个Flink流处理节点 #### sources(必需参数) 用于定义一个或多个数据源@Source()
@Stream(sources = {@Source(),...},transforms = {@Transform(),...},sinks = {@Sink(),...})
transforms(可选参数)
用于定义一个或多个数据转换算子@Transform()
sinks(可选参数)
用于定义一个或多个输出源@Sink()
@Source
@Source用于定义一个具体的数据源 #### sourceFunction(可选参数) 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix(可选参数) 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName(可选参数) 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名) #### shareStreamName(可选参数) 共享数据流标识名,当其它流处理节点需要使用时定义。
@Source(sourceFunction = <? extends sinkFunction>,configPrefix = "",streamName = "",shareStreamName = "")
@Transform
@Transform(transformFunction = <? extends Function>,shareStreamNames = {},isEnable = {})
� @Transform用于定义一个转换算子。当定义多个数据源时,为了区分同一算子不同数据源不同的业务处理,请定义数据源时增加configPrefix属性,定义转换为配置增加型算子,及可区别不同数据源进行实例化。(可查看文档案例部分)
transformFunction(必需参数)
转换算子class类型。(见转换算子章节)
shareStreamNames(可选参数)
共享数据流标识名,当其它流处理节点需要使用时定义。
isEnable(可选参数)
在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)
@Sink
@Sink用于定义一个具体的输出源。
@Sink(sinkFunction = <? extends SinkFunction>,configPrefix = "",)
sinkFunction(必需参数)
获取所需输出源class类型。
configPrefix(可选参数)
用于输出源实例中区分获取需要的配置项。
启动命令
注解方式应用启动
RestApi启动时:entryClass为com.zxelec.scaffold.app.Application
—MainClass {应用启动类全类名} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]�样例:
—MainClass com.zxelec.demo.HelloWorldApplication —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}参数方式应用启动
RestApi启动时:entryClass为com.zxelec.scaffold.app.MainArgsApplication
—Flow {流程JSON} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]�样例:
—Flow {…} —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}流程JSON格式规则样例为:
{"mainArgsConfig": "com.zxelec.relation.config.LoadMainArgsConfig""streams": [{"sources": [{"sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource","configPrefix": "stream1"},{"sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource","configPrefix": "stream2"}],"transforms": [{"transformFunction": "com.zxelec.relation.component.DataStandardMap"},{"transformFunction": "com.zxelec.relation.component.DataStandardFilter"}],"sinks": [{"sinkFunction": "com.zxelec.relation.component.RelationSink","configPrefix": "relation"}]},{...}]}
�
脚本架组件
Source
Kafka
sourceFunction = KafkaSource.class/*** [{configPrefix}.]kafka.name=car_stream // 必需* [{configPrefix}.]kafka.bootstrap.servers=192.168.1.249:9092 // 必需* [{configPrefix}.]kafka.topic=car_stream // 必需* [{configPrefix}.]kafka.group.id=demo01 // 必需* [{configPrefix}.]kafka.auto.offset.reset=earliest // 必需* [{configPrefix}.]kafka.enable.auto.commit=false // 必需* [{configPrefix}.]kafka.request.timeout.ms=10000 // 默认值:60000* [{configPrefix}.]kafka.session.timeout.ms=10000 // 默认值:40000* [{configPrefix}.]kafka.commit.offsets.on.checkpoint=false // 默认值false* --- kafka.deserialization.type start ---* kafka.deserialization.type: simple/KVTimestamp** simple: 获取kafka value 封装为 String 类型数据* KVTimestamp: 获取kafka key/value/timestamp,并按顺序封装为 Tuple3<String, String, Long> 类型数据** [{configPrefix}.]kafka.deserialization.type=simple // 默认值:simple* --- kafka.deserialization.type end ---* 等待更新...*/
Transform
原生算子
Flink原生算子:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/
脚手架算子:
/*** 组合keyBy及keyedProcess* @param <K>* @param <I>* @param <O>*/public abstract class MergeKeyedProcessFunction<K, I, O> extends KeyedProcessFunction<K, I, O> {public abstract KeySelector<I, K> keyBy();}
Config增加型算子
config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:
package com.zxelec.demo.process;import com.zxelec.scaffold.component.ConfigMapFunction;import org.apache.flink.api.java.utils.ParameterTool;public class DemoConfigMap extends ConfigMapFunction<String, String> {public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {super(parameterTool, configPrefix);// parameterTool为应用程序配置容器// configPrefix为算子处理的输入流定义的属性}@Overridepublic String map(String o) throws Exception {return null;}}
目前脚手架已支持的增加型算子有:
ConfigFilterFunction
�ConfigFlatMapFunction
�ConfigKeyedCoProcessFunction
�ConfigKeyedProcessFunction
�ConfigKeySelector
�ConfigMapFunction
�ConfigProcessJoinFunction
�ConfigSerializableTimestampAssigner
ConfigSinkFunction�
ConfigMergeKeyedProcessFunction
JsonMapper
JsonMapper在ConfigMapFunction算子的基础上,以提供json映射文件或映射参数,自动完成json源字符串至json目标字符串的转换处理。
@Flow(streams = {@Stream(sources = {@Source(sourceFunction = KafkaSource.class)},transforms = {@Transform(transformFunction = JsonMapper.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}),})public class JsonApplication extends Application {}
app.name=HelloWorld# 基础环境配置env.checkpoint.interval=30env.checkpoint.directory=file:///Users/reality/Desktop/ckenv.restart.attempts=3env.restart.delay.interval=10# kafkakafka.name=imsi01kafka.bootstrap.servers=192.168.1.249:9092kafka.topic=imsi01kafka.group.id=demo00kafka.auto.offset.reset=earliestkafka.enable.auto.commit=false# 指定映射文件路径 (优先级低)json.mapper.file=/Users/reality/zx/project/flink-bootstrap/flink-bootstrap-component/flink-bootstrap-component-demo/imsi.json# 指定映射内容 (优先级高)json.mapper.param=
�
映射项说明: targetName:输出字段名 jsonPath:jsonPath语法提取输出值 transformType:输出值转换类型 transformRule: 转换规则
[{"targetName": "objectId","jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateNo,$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateColor",// join转换:提取多项源数据字段值,以transformRule.join字符串进行拼接。// 注意:多项源字段名以jsonPath语法用逗号拼接赋值于jsonPath字段"transformType": "join","transformRule": {"join": "_"}},{"targetName": "trailTime","jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTimer",// 将源数据时间字符串字段值,转换为其它日期格式字符串"transformType": "dateTime","transformRule": {"sourceDateFormat": "yyyyMMddHHmmssSSS","targetDateFormat": "yyyy-MM-dd HH:mm:ss"}},{// 提取源数据值,不做转换处理"targetName": "enterTime","jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTime"},{"targetName": "trailRemark",// 新增字段并赋值transformRule.defaultValue"transformType": "default","transformRule": {"defaultValue": "过车数据"}},{"targetName": "processTime","jsonPath": "$.shotTime",// 将源数据时间戳时间格式化至指定日期格式字符串"transformType": "timeStamp","transformRule": {"dateformat": "yyyy-MM-dd HH:mm:ss"}}// 等待更新...]
NativeTransform
nativeTransform中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�
public class RelationNativeTransform implements NativeTransform {/*** 业务处理* @param dataStreamList Tuple<流,流的配置前缀>* @param config 配置容器* @return 返回形式与 @param dataStreamList 相同,请保存流及配置顺序。否则会造成后续业务处理异常*/@Overridepublic List<Tuple2<DataStream, String>> process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {// TODO... 业务书写return null;}}
Sink
JdbcSink
- 实现JdbcSink接口并根据自己业务完成接口方法。
 
package com.zxelec.demo.process;import com.zxelec.scaffold.component.sink.JdbcSink;import org.apache.flink.connector.jdbc.JdbcStatementBuilder;/*** @author: Felix* @description: DemoJdbcSink* @date: 16:02 2022/11/2* @version: 1.0*/public class DemoJdbcSink implements JdbcSink<Relation> {@Overridepublic String prepareStatement() {return "INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";}@Overridepublic JdbcStatementBuilder<Relation> setParameters() {return (statement, relation) -> {statement.setString(1, relation.getLayoutObjectId());statement.setString(2, relation.getAccompanyObjectId());statement.setLong(3, relation.getLayoutEventTime()/1000);statement.setLong(4, relation.getAccompanyEventTime()/1000);statement.setString(5, relation.getLayoutAddressId());statement.setString(6, relation.getAccompanyAddressId());statement.setString(7, relation.getLayoutObjectType());statement.setString(8, relation.getAccompanyObjectType());statement.setString(9, relation.getGid());statement.setString(10, relation.getLayoutRemark());statement.setString(11, relation.getAccompanyRemark());statement.setLong(12, System.currentTimeMillis()/1000);};}}
- 增加数据库相关配置项
 
[{configPrefix}.]jdbc.url=jdbc:clickhouse://192.168.1.101:8123/demo[{configPrefix}.]jdbc.username=zhangsan[{configPrefix}.]jdbc.password=123456[{configPrefix}.]jdbc.driver.name=com.clickhouse.jdbc.ClickHouseDriver# jdbc.data.batch.size 默认值:400[{configPrefix}.]jdbc.data.batch.size=400# jdbc.data.batch.interval.ms 默认值200[{configPrefix}.]jdbc.data.batch.interval.ms=200
- 应用配置
 
sinks = {@Sink(sinkFunction = DemoJdbcSink.class)}
PrintSinkFunction
- 应用配置:
 
sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}
�KafkaSink
- 增加kafka sink相关配置
 
[{configPrefix}.]kafka.sink.brokers=192.168.1.1[{configPrefix}.]kafka.sink.topic=demo_topic# exactly-once/at-least-once 默认值:at-least-once[{configPrefix}.]kafka.sink.mode=at-least-once
- 应用配置
 
sinks = {@Sink(sinkFunction = KafkaSink.class)}
NativeSink
nativeSink中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�
public class RelationNativeSink implements NativeSink {/*** 业务处理* @param dataStreamList dataStreamList Tuple<流,流的配置前缀>* @param config 配置容器*/@Overridepublic void process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {// TODO... 业务书写}}
环境配置项
# 应用程序名称(默认值:defaultStreamApp)app.name=HelloWorld# -------- parallelism config start --------# 任务全局并行度env.parallelism=1(默认值:1)# -------- parallelism config end --------# -------- 本地调试开启webui config start --------env.local.web.ui.enable=true(默认值:false)env.local.web.ui.port=8081(默认值:8081)# 配置后maven依赖增加以下项即可,访问路径 localhost:8081# <dependency># <groupId>org.apache.flink</groupId># <artifactId>flink-runtime-web_2.12</artifactId># <version>1.14.6</version># </dependency># -------- 本地调试开启webui config end --------# -------- checkpoint config start --------# 是否开启checkpoint(true、false。默认值:false)env.checkpoint.enable=true# 模式(exactly_once、at_least_once。默认值:at_least_once)env.checkpoint.mode=exactly_once# checkpoint触发时间间隔(单位:毫秒。默认值:600000)env.checkpoint.interval=60000# checkpoint存储目录(必需)env.checkpoint.directory=file:///Users/reality/Desktop/ck# 失败重试次数(默认值:3)env.restart.attempts=5# 失败重试间隔(单位:秒。默认值:10)env.restart.delay.interval=30# -------- checkpoint config end --------# 等待更新...
脚手架编程编排样例
样例1:简单的单流处理
@Flow(streams = {@Stream(sources = {@Source(sourceFunction = KafkaSource.class)},transforms = {@Transform(transformFunction = DemoMap.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}),})public class Demo1Application extends Application {}
# kafka source configkafka.name=demo01kafka.bootstrap.servers=192.168.1.1:9092kafka.topic=demo01kafka.group.id=demo01kafka.auto.offset.reset=earliestkafka.enable.auto.commit=false
public class DemoMap implements MapFunction<String, Integer> {@Overridepublic Integer map(String value) throws Exception {// 业务处理...return value.length();}}
样例2:自定义解析Main Args,单流及其过程流转换处理
@Flow(mainArgsConfig = CsvMainArgs.class,streams = {@Stream(sources = {@Source(sourceFunction = KafkaSource.class)},transforms = {@Transform(transformFunction = DemoMap.class,// 定义转换处理之后的流标识,作为其它任务数据源使用shareStreamNames = {"mapStream"})},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}),@Stream(sources = {@Source(// 引用之前任务某阶段的流标识,做为此任务的数据源streamName = "mapStream")},transforms = {@Transform(transformFunction = SizeMap.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}),})public class Demo2Application extends Application {}
# kafka source configkafka.name=demo01kafka.bootstrap.servers=192.168.1.1:9092kafka.topic=demo01kafka.group.id=demo01kafka.auto.offset.reset=earliestkafka.enable.auto.commit=false
public class CsvMainArgs implements MainArgsConfig {@Overridepublic HashMap<String, String> process(String args) {// TODO 解析逻辑...return null;}}
public class DemoMap implements MapFunction<String, Integer> {@Overridepublic Integer map(String value) throws Exception {// 业务处理...return value.length();}}
public class SizeMap implements MapFunction<String, Integer> {@Overridepublic Integer map(String json) throws Exception {return json.length();}}
样例3:双流转换,并连接处理
@Flow(mainArgsConfig = CsvMainArgs.class,streams = {// 任务1 获取订单流数据,转换并对流分组@Stream(sources = {@Source(sourceFunction = KafkaSource.class,configPrefix = "order")},transforms = {@Transform(transformFunction = OrderInfo.class),@Transform(transformFunction = OrderKey.class,// 定义此转换结果流标识,供之后任务作为数据源使用shareStreamNames = "orderKeyStream")}),// 任务2 获取用户流数据,转换并对流分组@Stream(sources = {@Source(sourceFunction = KafkaSource.class,configPrefix = "admin")},transforms = {@Transform(transformFunction = AdminInfo.class),@Transform(transformFunction = AdminKey.class,// 定义此转换结果流标识,供之后任务作为数据源使用shareStreamNames = "adminKeyStream")}),// 任务3 获取分组后的订单流及用户流,进行相同分组数据业务处理@Stream(sources = {@Source(streamName = "orderKeyStream"),@Source(streamName = "adminKeyStream")},transforms = {@Transform(transformFunction = OrderAndAdminConnect.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)}),})public class Demo3Application extends Application {}
# order kafka source configkafka.name=orderkafka.bootstrap.servers=192.168.1.1:9092kafka.topic=order_topickafka.group.id=order01kafka.auto.offset.reset=earliestkafka.enable.auto.commit=false# admin kafka source configadmin.kafka.name=adminadmin.kafka.bootstrap.servers=192.168.1.1:9092admin.kafka.topic=admin_topicadmin.kafka.group.id=admin01admin.kafka.auto.offset.reset=earliestadmin.kafka.enable.auto.commit=false
public class OrderInfo implements MapFunction<String, String> {@Overridepublic String map(String order) throws Exception {// 业务处理...return order;}}
public class OrderKey implements KeySelector<String, String> {@Overridepublic String getKey(String order) throws Exception {// key 获取处理...return order;}}
public class AdminInfo implements MapFunction<String, String> {@Overridepublic String map(String admin) throws Exception {// 业务处理...return admin;}}
public class AdminKey implements KeySelector<String, String> {@Overridepublic String getKey(String admin) throws Exception {// key 获取处理...return admin;}}
public class OrderAndAdminConnect extends KeyedCoProcessFunction<String, String, String, String> {@Overridepublic void processElement1(String s, Context context, Collector<String> collector) throws Exception {// 订单流数据...}@Overridepublic void processElement2(String s, Context context, Collector<String> collector) throws Exception {// 用户流数据...}}
