更新记录
时间 | 更新内容 | 更新人 |
---|---|---|
2022-10-31 | 文档创建 | 付飞 |
2022-11-01 | 增加默认mainArgsConfig配置类解析 | 付飞 |
2022-11-02 | 1. 配置项必要与非必要参数识别,并增加非必要默认值 2. 增加Jdbc、Print、Kafka三种Sink组件支持 3. 增加通过main参数Flow构建流程定义并启动相应应用 4. 其它细节调整 |
付飞 |
2022-11-07 | 1. 支持kafka source序列化定义及类型支持 2. 增加shareStreamName属性定义 |
付飞 |
2022-11-09 | 1. 增加merge…算子优化支持 2. 更新部分算子处理逻辑及部分bug |
付飞 |
2022-11-10 | 增加脚手架编排样例(持续更新) | 付飞 |
2022-11-15 | 增加JsonMapper增加算子(算子转换规则将持续更新) | 付飞 |
2022-12-19 | 增加transform、sink原生代码解析支持 | 付飞 |
2023-01-04 | 增加checkpoint是否开启配置 增加kafka source checkpoint 提交 offset 配置 |
付飞 |
2023-01-16 | 增加任务全局并行度设置 增加本地调试开启webui |
付飞 |
Hello World
hello world案例中,我们将消费kafka数据(sink),并计算每条数据的长度(transform),而后打印至控制台(sink)。
1. 引入脚本架依赖
<dependency>
<groupId>com.zxelec</groupId>
<artifactId>flink-scaffold</artifactId>
<version>1.1</version>
</dependency>
2. 定义启动类
package com.zxelec.demo;
import com.zxelec.demo.process.SizeMap;
import com.zxelec.scaffold.annotation.Flow;
import com.zxelec.scaffold.annotation.Sink;
import com.zxelec.scaffold.annotation.Source;
import com.zxelec.scaffold.annotation.Stream;
import com.zxelec.scaffold.annotation.Transform;
import com.zxelec.scaffold.app.Application;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@Flow(
streams = {
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class,
configPrefix = "demo"
)
},
transforms = {
@Transform(
transformFunction = SizeMap.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
)
}
)
public class HelloWorldApplication extends Application {
}
SizeMap.java
package com.zxelec.demo.process;
import org.apache.flink.api.common.functions.MapFunction;
public class SizeMap implements MapFunction<String, Integer> {
@Override
public Integer map(String json) throws Exception {
return json.length();
}
}
3. 定义配置文件
# 应用名称
app.name=HelloWorld
# 基础环境配置
env.checkpoint.enable=true
env.checkpoint.interval=30
env.checkpoint.directory=file:///Users/felix/Desktop/ck
env.restart.attempts=3
env.restart.delay.interval=10
# kafka
demo.name=car_stream
demo.bootstrap.servers=192.168.1.249:9092
demo.topic=car_stream
demo.group.id=demo_group_01
demo.auto.offset.reset=earliest
demo.enable.auto.commit=false
4. 启动参数
--MainClass com.zxelec.demo.HelloWorldApplication --ConfigPath ${application.properties文件路径}
完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!
注解标签
@Flow
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig(可选参数) 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下:
@Flow(
mainArgsConfig = <? extends MainArgsConfig.class>,
streams = {
@Stream(),
...
}
)
java
/**
* @author: Felix
* @description: 解析main args传参
* @date: 15:45 2022/10/25
* @version: 1.0
*/
public interface MainArgsConfig {
/**
* 解析配置项
* @param args
* @return
*/
HashMap<String, String> process(String mainArgs);
}
备注:此版本脚手架已提供默认mainArgs参数解析类,可自动解析单层kv结构JSON格式数据,将其封闭入配置容器ParameterTool中。使用时无需显示增加 mainArgsConfig = MainArgsConfig.class 类似定义。
{
"name": "zhangsan",
"age": 18
}
streams(必需参数)
用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。
@Stream
@Stream用于定义一个Flink流处理节点 #### sources(必需参数) 用于定义一个或多个数据源@Source()
@Stream(
sources = {
@Source(),
...
},
transforms = {
@Transform(),
...
},
sinks = {
@Sink(),
...
}
)
transforms(可选参数)
用于定义一个或多个数据转换算子@Transform()
sinks(可选参数)
用于定义一个或多个输出源@Sink()
@Source
@Source用于定义一个具体的数据源 #### sourceFunction(可选参数) 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix(可选参数) 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName(可选参数) 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名) #### shareStreamName(可选参数) 共享数据流标识名,当其它流处理节点需要使用时定义。
@Source(
sourceFunction = <? extends sinkFunction>,
configPrefix = "",
streamName = "",
shareStreamName = ""
)
@Transform
@Transform(
transformFunction = <? extends Function>,
shareStreamNames = {},
isEnable = {}
)
� @Transform用于定义一个转换算子。当定义多个数据源时,为了区分同一算子不同数据源不同的业务处理,请定义数据源时增加configPrefix属性,定义转换为配置增加型算子,及可区别不同数据源进行实例化。(可查看文档案例部分)
transformFunction(必需参数)
转换算子class类型。(见转换算子章节)
shareStreamNames(可选参数)
共享数据流标识名,当其它流处理节点需要使用时定义。
isEnable(可选参数)
在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)
@Sink
@Sink用于定义一个具体的输出源。
@Sink(
sinkFunction = <? extends SinkFunction>,
configPrefix = "",
)
sinkFunction(必需参数)
获取所需输出源class类型。
configPrefix(可选参数)
用于输出源实例中区分获取需要的配置项。
启动命令
注解方式应用启动
RestApi启动时:entryClass为com.zxelec.scaffold.app.Application
—MainClass {应用启动类全类名} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]�样例:
—MainClass com.zxelec.demo.HelloWorldApplication —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}参数方式应用启动
RestApi启动时:entryClass为com.zxelec.scaffold.app.MainArgsApplication
—Flow {流程JSON} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]�样例:
—Flow {…} —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}流程JSON格式规则样例为:
{
"mainArgsConfig": "com.zxelec.relation.config.LoadMainArgsConfig"
"streams": [
{
"sources": [
{
"sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource",
"configPrefix": "stream1"
},
{
"sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource",
"configPrefix": "stream2"
}
],
"transforms": [
{
"transformFunction": "com.zxelec.relation.component.DataStandardMap"
},
{
"transformFunction": "com.zxelec.relation.component.DataStandardFilter"
}
],
"sinks": [
{
"sinkFunction": "com.zxelec.relation.component.RelationSink",
"configPrefix": "relation"
}
]
},
{
...
}
]
}
�
脚本架组件
Source
Kafka
sourceFunction = KafkaSource.class
/**
* [{configPrefix}.]kafka.name=car_stream // 必需
* [{configPrefix}.]kafka.bootstrap.servers=192.168.1.249:9092 // 必需
* [{configPrefix}.]kafka.topic=car_stream // 必需
* [{configPrefix}.]kafka.group.id=demo01 // 必需
* [{configPrefix}.]kafka.auto.offset.reset=earliest // 必需
* [{configPrefix}.]kafka.enable.auto.commit=false // 必需
* [{configPrefix}.]kafka.request.timeout.ms=10000 // 默认值:60000
* [{configPrefix}.]kafka.session.timeout.ms=10000 // 默认值:40000
* [{configPrefix}.]kafka.commit.offsets.on.checkpoint=false // 默认值false
* --- kafka.deserialization.type start ---
* kafka.deserialization.type: simple/KVTimestamp
*
* simple: 获取kafka value 封装为 String 类型数据
* KVTimestamp: 获取kafka key/value/timestamp,并按顺序封装为 Tuple3<String, String, Long> 类型数据
*
* [{configPrefix}.]kafka.deserialization.type=simple // 默认值:simple
* --- kafka.deserialization.type end ---
* 等待更新...
*/
Transform
原生算子
Flink原生算子:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/
脚手架算子:
/**
* 组合keyBy及keyedProcess
* @param <K>
* @param <I>
* @param <O>
*/
public abstract class MergeKeyedProcessFunction<K, I, O> extends KeyedProcessFunction<K, I, O> {
public abstract KeySelector<I, K> keyBy();
}
Config增加型算子
config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:
package com.zxelec.demo.process;
import com.zxelec.scaffold.component.ConfigMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
public class DemoConfigMap extends ConfigMapFunction<String, String> {
public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {
super(parameterTool, configPrefix);
// parameterTool为应用程序配置容器
// configPrefix为算子处理的输入流定义的属性
}
@Override
public String map(String o) throws Exception {
return null;
}
}
目前脚手架已支持的增加型算子有:
ConfigFilterFunction
�ConfigFlatMapFunction
�ConfigKeyedCoProcessFunction
�ConfigKeyedProcessFunction
�ConfigKeySelector
�ConfigMapFunction
�ConfigProcessJoinFunction
�ConfigSerializableTimestampAssigner
ConfigSinkFunction�
ConfigMergeKeyedProcessFunction
JsonMapper
JsonMapper在ConfigMapFunction算子的基础上,以提供json映射文件或映射参数,自动完成json源字符串至json目标字符串的转换处理。
@Flow(
streams = {
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class
)
},
transforms = {
@Transform(
transformFunction = JsonMapper.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
),
}
)
public class JsonApplication extends Application {
}
app.name=HelloWorld
# 基础环境配置
env.checkpoint.interval=30
env.checkpoint.directory=file:///Users/reality/Desktop/ck
env.restart.attempts=3
env.restart.delay.interval=10
# kafka
kafka.name=imsi01
kafka.bootstrap.servers=192.168.1.249:9092
kafka.topic=imsi01
kafka.group.id=demo00
kafka.auto.offset.reset=earliest
kafka.enable.auto.commit=false
# 指定映射文件路径 (优先级低)
json.mapper.file=/Users/reality/zx/project/flink-bootstrap/flink-bootstrap-component/flink-bootstrap-component-demo/imsi.json
# 指定映射内容 (优先级高)
json.mapper.param=
�
映射项说明: targetName:输出字段名 jsonPath:jsonPath语法提取输出值 transformType:输出值转换类型 transformRule: 转换规则
[
{
"targetName": "objectId",
"jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateNo,$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateColor",
// join转换:提取多项源数据字段值,以transformRule.join字符串进行拼接。
// 注意:多项源字段名以jsonPath语法用逗号拼接赋值于jsonPath字段
"transformType": "join",
"transformRule": {
"join": "_"
}
},
{
"targetName": "trailTime",
"jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTimer",
// 将源数据时间字符串字段值,转换为其它日期格式字符串
"transformType": "dateTime",
"transformRule": {
"sourceDateFormat": "yyyyMMddHHmmssSSS",
"targetDateFormat": "yyyy-MM-dd HH:mm:ss"
}
},
{
// 提取源数据值,不做转换处理
"targetName": "enterTime",
"jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTime"
},
{
"targetName": "trailRemark",
// 新增字段并赋值transformRule.defaultValue
"transformType": "default",
"transformRule": {
"defaultValue": "过车数据"
}
},
{
"targetName": "processTime",
"jsonPath": "$.shotTime",
// 将源数据时间戳时间格式化至指定日期格式字符串
"transformType": "timeStamp",
"transformRule": {
"dateformat": "yyyy-MM-dd HH:mm:ss"
}
}
// 等待更新...
]
NativeTransform
nativeTransform中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�
public class RelationNativeTransform implements NativeTransform {
/**
* 业务处理
* @param dataStreamList Tuple<流,流的配置前缀>
* @param config 配置容器
* @return 返回形式与 @param dataStreamList 相同,请保存流及配置顺序。否则会造成后续业务处理异常
*/
@Override
public List<Tuple2<DataStream, String>> process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {
// TODO... 业务书写
return null;
}
}
Sink
JdbcSink
- 实现JdbcSink接口并根据自己业务完成接口方法。
package com.zxelec.demo.process;
import com.zxelec.scaffold.component.sink.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
/**
* @author: Felix
* @description: DemoJdbcSink
* @date: 16:02 2022/11/2
* @version: 1.0
*/
public class DemoJdbcSink implements JdbcSink<Relation> {
@Override
public String prepareStatement() {
return "INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
}
@Override
public JdbcStatementBuilder<Relation> setParameters() {
return (statement, relation) -> {
statement.setString(1, relation.getLayoutObjectId());
statement.setString(2, relation.getAccompanyObjectId());
statement.setLong(3, relation.getLayoutEventTime()/1000);
statement.setLong(4, relation.getAccompanyEventTime()/1000);
statement.setString(5, relation.getLayoutAddressId());
statement.setString(6, relation.getAccompanyAddressId());
statement.setString(7, relation.getLayoutObjectType());
statement.setString(8, relation.getAccompanyObjectType());
statement.setString(9, relation.getGid());
statement.setString(10, relation.getLayoutRemark());
statement.setString(11, relation.getAccompanyRemark());
statement.setLong(12, System.currentTimeMillis()/1000);
};
}
}
- 增加数据库相关配置项
[{configPrefix}.]jdbc.url=jdbc:clickhouse://192.168.1.101:8123/demo
[{configPrefix}.]jdbc.username=zhangsan
[{configPrefix}.]jdbc.password=123456
[{configPrefix}.]jdbc.driver.name=com.clickhouse.jdbc.ClickHouseDriver
# jdbc.data.batch.size 默认值:400
[{configPrefix}.]jdbc.data.batch.size=400
# jdbc.data.batch.interval.ms 默认值200
[{configPrefix}.]jdbc.data.batch.interval.ms=200
- 应用配置
sinks = {
@Sink(
sinkFunction = DemoJdbcSink.class
)
}
PrintSinkFunction
- 应用配置:
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
�KafkaSink
- 增加kafka sink相关配置
[{configPrefix}.]kafka.sink.brokers=192.168.1.1
[{configPrefix}.]kafka.sink.topic=demo_topic
# exactly-once/at-least-once 默认值:at-least-once
[{configPrefix}.]kafka.sink.mode=at-least-once
- 应用配置
sinks = {
@Sink(
sinkFunction = KafkaSink.class
)
}
NativeSink
nativeSink中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�
public class RelationNativeSink implements NativeSink {
/**
* 业务处理
* @param dataStreamList dataStreamList Tuple<流,流的配置前缀>
* @param config 配置容器
*/
@Override
public void process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {
// TODO... 业务书写
}
}
环境配置项
# 应用程序名称(默认值:defaultStreamApp)
app.name=HelloWorld
# -------- parallelism config start --------
# 任务全局并行度
env.parallelism=1(默认值:1)
# -------- parallelism config end --------
# -------- 本地调试开启webui config start --------
env.local.web.ui.enable=true(默认值:false)
env.local.web.ui.port=8081(默认值:8081)
# 配置后maven依赖增加以下项即可,访问路径 localhost:8081
# <dependency>
# <groupId>org.apache.flink</groupId>
# <artifactId>flink-runtime-web_2.12</artifactId>
# <version>1.14.6</version>
# </dependency>
# -------- 本地调试开启webui config end --------
# -------- checkpoint config start --------
# 是否开启checkpoint(true、false。默认值:false)
env.checkpoint.enable=true
# 模式(exactly_once、at_least_once。默认值:at_least_once)
env.checkpoint.mode=exactly_once
# checkpoint触发时间间隔(单位:毫秒。默认值:600000)
env.checkpoint.interval=60000
# checkpoint存储目录(必需)
env.checkpoint.directory=file:///Users/reality/Desktop/ck
# 失败重试次数(默认值:3)
env.restart.attempts=5
# 失败重试间隔(单位:秒。默认值:10)
env.restart.delay.interval=30
# -------- checkpoint config end --------
# 等待更新...
脚手架编程编排样例
样例1:简单的单流处理
@Flow(
streams = {
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class
)
},
transforms = {
@Transform(
transformFunction = DemoMap.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
),
}
)
public class Demo1Application extends Application {
}
# kafka source config
kafka.name=demo01
kafka.bootstrap.servers=192.168.1.1:9092
kafka.topic=demo01
kafka.group.id=demo01
kafka.auto.offset.reset=earliest
kafka.enable.auto.commit=false
public class DemoMap implements MapFunction<String, Integer> {
@Override
public Integer map(String value) throws Exception {
// 业务处理...
return value.length();
}
}
样例2:自定义解析Main Args,单流及其过程流转换处理
@Flow(
mainArgsConfig = CsvMainArgs.class,
streams = {
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class
)
},
transforms = {
@Transform(
transformFunction = DemoMap.class,
// 定义转换处理之后的流标识,作为其它任务数据源使用
shareStreamNames = {"mapStream"}
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
),
@Stream(
sources = {
@Source(
// 引用之前任务某阶段的流标识,做为此任务的数据源
streamName = "mapStream"
)
},
transforms = {
@Transform(
transformFunction = SizeMap.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
),
}
)
public class Demo2Application extends Application {
}
# kafka source config
kafka.name=demo01
kafka.bootstrap.servers=192.168.1.1:9092
kafka.topic=demo01
kafka.group.id=demo01
kafka.auto.offset.reset=earliest
kafka.enable.auto.commit=false
public class CsvMainArgs implements MainArgsConfig {
@Override
public HashMap<String, String> process(String args) {
// TODO 解析逻辑...
return null;
}
}
public class DemoMap implements MapFunction<String, Integer> {
@Override
public Integer map(String value) throws Exception {
// 业务处理...
return value.length();
}
}
public class SizeMap implements MapFunction<String, Integer> {
@Override
public Integer map(String json) throws Exception {
return json.length();
}
}
样例3:双流转换,并连接处理
@Flow(
mainArgsConfig = CsvMainArgs.class,
streams = {
// 任务1 获取订单流数据,转换并对流分组
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class,
configPrefix = "order"
)
},
transforms = {
@Transform(
transformFunction = OrderInfo.class
),
@Transform(
transformFunction = OrderKey.class,
// 定义此转换结果流标识,供之后任务作为数据源使用
shareStreamNames = "orderKeyStream"
)
}
),
// 任务2 获取用户流数据,转换并对流分组
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class,
configPrefix = "admin"
)
},
transforms = {
@Transform(
transformFunction = AdminInfo.class
),
@Transform(
transformFunction = AdminKey.class,
// 定义此转换结果流标识,供之后任务作为数据源使用
shareStreamNames = "adminKeyStream"
)
}
),
// 任务3 获取分组后的订单流及用户流,进行相同分组数据业务处理
@Stream(
sources = {
@Source(
streamName = "orderKeyStream"
),
@Source(
streamName = "adminKeyStream"
)
},
transforms = {
@Transform(
transformFunction = OrderAndAdminConnect.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
),
}
)
public class Demo3Application extends Application {
}
# order kafka source config
kafka.name=order
kafka.bootstrap.servers=192.168.1.1:9092
kafka.topic=order_topic
kafka.group.id=order01
kafka.auto.offset.reset=earliest
kafka.enable.auto.commit=false
# admin kafka source config
admin.kafka.name=admin
admin.kafka.bootstrap.servers=192.168.1.1:9092
admin.kafka.topic=admin_topic
admin.kafka.group.id=admin01
admin.kafka.auto.offset.reset=earliest
admin.kafka.enable.auto.commit=false
public class OrderInfo implements MapFunction<String, String> {
@Override
public String map(String order) throws Exception {
// 业务处理...
return order;
}
}
public class OrderKey implements KeySelector<String, String> {
@Override
public String getKey(String order) throws Exception {
// key 获取处理...
return order;
}
}
public class AdminInfo implements MapFunction<String, String> {
@Override
public String map(String admin) throws Exception {
// 业务处理...
return admin;
}
}
public class AdminKey implements KeySelector<String, String> {
@Override
public String getKey(String admin) throws Exception {
// key 获取处理...
return admin;
}
}
public class OrderAndAdminConnect extends KeyedCoProcessFunction<String, String, String, String> {
@Override
public void processElement1(String s, Context context, Collector<String> collector) throws Exception {
// 订单流数据...
}
@Override
public void processElement2(String s, Context context, Collector<String> collector) throws Exception {
// 用户流数据...
}
}