更新记录
时间 | 更新内容 | 更新人 |
---|---|---|
2022-10-28 | 文档创建 | 付飞 |
Hello World
1. 引入脚本架依赖
<dependency>
<groupId>com.zxelec</groupId>
<artifactId>flink-scaffold</artifactId>
<version>1.0</version>
</dependency>
2. 定义启动类
package com.zxelec.demo;
import com.zxelec.demo.process.SizeMap;
import com.zxelec.scaffold.annotation.Flow;
import com.zxelec.scaffold.annotation.Sink;
import com.zxelec.scaffold.annotation.Source;
import com.zxelec.scaffold.annotation.Stream;
import com.zxelec.scaffold.annotation.Transform;
import com.zxelec.scaffold.app.Application;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@Flow(
streams = {
@Stream(
sources = {
@Source(
sourceFunction = KafkaSource.class,
configPrefix = "demo"
)
},
transforms = {
@Transform(
transformFunction = SizeMap.class
)
},
sinks = {
@Sink(
sinkFunction = PrintSinkFunction.class
)
}
)
}
)
public class HelloWorldApplication extends Application {
}
SizeMap.java
package com.zxelec.demo.process;
import org.apache.flink.api.common.functions.MapFunction;
public class SizeMap implements MapFunction<String, Integer> {
@Override
public Integer map(String json) throws Exception {
return json.length();
}
}
3. 定义配置文件
# 标识是否为必须及默认值
app.name=HelloWorld
# 基础环境配置
env.checkpoint.interval=30
env.checkpoint.directory=file:///Users/felix/Desktop/ck
env.restart.attempts=3
env.restart.delay.interval=10
# kafka
demo.name=car_stream
demo.bootstrap.servers=192.168.1.249:9092
demo.topic=car_stream
demo.group.id=demo_group_01
demo.auto.offset.reset=earliest
demo.enable.auto.commit=false
4. 启动参数
--MainClass com.zxelec.demo.HelloWorldApplication --configPath ${application.properties文件路径}
完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!
注解标签
@Flow
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下:
@Flow(
mainArgsConfig = MainArgsConfig.class,
streams = {
@Stream(),
...
}
)
java
/**
* @author: Felix
* @description: 解析main args传参
* @date: 15:45 2022/10/25
* @version: 1.0
*/
public interface MainArgsConfig {
/**
* 解析配置项
* @param args
* @return
*/
HashMap<String, String> process(String mainArgs);
}
#### streams
用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。
### @Stream
java
@Stream(
sources = {
@Source(),
...
},
transforms = {
@Transform(),
...
},
sinks = {
@Sink(),
...
}
)
@Stream用于定义一个Flink流处理节点
#### sources
用于定义一个或多个数据源@Source()
transforms
用于定义一个或多个数据转换算子@Transform()
sinks
用于定义一个或多个输出源@Sink()
@Source
@Source用于定义一个具体的数据源 #### sourceFunction 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名)
@Source(
sourceFunction = KafkaSource.class,
configPrefix = "",
streamName = ""
)
/**
* KafkaSource为脚手架提供的数据源
* 配置项为:
* {configPrefix}.name=
* {configPrefix}.bootstrap.servers=
* {configPrefix}.topic=
* {configPrefix}.group.id=
* {configPrefix}.auto.offset.reset=
* {configPrefix}.enable.auto.commit=
*
*/
@Transform
@Transform(
transformFunction = Map.class,
configPrefix = {},
streamNames = {},
isEnable = {}
)
� @Transform用于定义一个转换算子。因一个转换算子可应用到多条数据源上,故其属性配置项常为数组形式,以便针对不同数据源提供准确的定义。
transformFunction
转换算子class类型。(见转换算子章节)
configPrefix
用于算子实例中获取需要的配置项。
streamNames
转换后的数据流标识名,当其它流处理节点需要使用时,可进行定义。
isEnable
在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)
@Sink
@Sink用于定义一个具体的输出源。
@Sink(
sinkFunction = PrintSinkFunction.class
configPrefix = {}
)
sinkFunction
获取所需输出源class类型。目前脚手架只提供了JDBC连接方式,继承ConfigSinkFunction
�并实现SinkFactory接口,使用样例如下。
package com.zxelec.relation.component;
import com.zxelec.relation.entity.Relation;
import com.zxelec.scaffold.component.ConfigSinkFunction;
import com.zxelec.scaffold.component.SinkFactory;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* @author: Felix
* @description: 关系散存存储
* @date: 15:45 2022/9/27
* @version: 1.0
*/
public class RelationSink extends ConfigSinkFunction implements SinkFactory {
private String ip;
private String port;
private String database;
private String username;
private String password;
private Integer dataBatchSize;
private Integer dataBatchIntervalMs;
public RelationSink(ParameterTool parameterTool, String configPrefix) {
super(parameterTool, configPrefix);
this.ip = parameterTool.get("database.ip");
this.port = parameterTool.get("database.port");
this.database = parameterTool.get("database");
this.username = parameterTool.get("database.user");
this.password = parameterTool.get("database.password");
this.dataBatchSize = parameterTool.getInt(configPrefix + ".data.batch.size");
this.dataBatchIntervalMs = parameterTool.getInt(configPrefix + ".data.batch.interval.ms");
}
@Override
public SinkFunction createSinkFunction() {
SinkFunction<Relation> sink = JdbcSink.sink(
"INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(statement, relation) -> {
statement.setString(1, relation.getLayoutObjectId());
statement.setString(2, relation.getAccompanyObjectId());
statement.setLong(3, relation.getLayoutEventTime()/1000);
statement.setLong(4, relation.getAccompanyEventTime()/1000);
statement.setString(5, relation.getLayoutAddressId());
statement.setString(6, relation.getAccompanyAddressId());
statement.setString(7, relation.getLayoutObjectType());
statement.setString(8, relation.getAccompanyObjectType());
statement.setString(9, relation.getGid());
statement.setString(10, relation.getLayoutRemark());
statement.setString(11, relation.getAccompanyRemark());
statement.setLong(12, System.currentTimeMillis()/1000);
},
JdbcExecutionOptions.builder()
.withBatchSize(dataBatchSize)
.withBatchIntervalMs(dataBatchIntervalMs)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + ip + ":" + port + "/" + database + "?autoReconnect=true&maxReconnects=5&initialTimeout=10&connectTimeout=30000&socketTimeout=30000")
.withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
.withUsername(username)
.withPassword(password)
.build()
);
return sink;
}
}
configPrefix
用于输出源实例中获取需要的配置项。
启动命令
—MainClass {应用启动类全类名} —configPath {配置文件绝对路径} —mainArgs {main参数}�样例:
—MainClass com.zxelec.demo.HelloWorldApplication —configPath /Users/felix/config/application.properties —mainArgs {\”name\”:”felix”}�
脚本架组件
Source
Kafka
sourceFunction = KafkaSource.class
/**
* {configPrefix}.name=car_stream //kafka流名称,用于定义flink查看指标的标识名
* {configPrefix}.bootstrap.servers=192.168.1.249:9092
* {configPrefix}.topic=car_stream
* {configPrefix}.group.id=demo01
* {configPrefix}.auto.offset.reset=earliest
* {configPrefix}.enable.auto.commit=false
*/
Transform
原生算子
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/
Config增加型算子
config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:
package com.zxelec.demo.process;
import com.zxelec.scaffold.component.ConfigMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
public class DemoConfigMap extends ConfigMapFunction<String, String> {
public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {
super(parameterTool, configPrefix);
// parameterTool为应用程序配置容器
// configPrefix为算子流程定义时的名称
}
@Override
public String map(String o) throws Exception {
return null;
}
}
目前脚手架已支持的增加型算子有:
ConfigFilterFunction
�ConfigFlatMapFunction
�ConfigKeyedCoProcessFunction
�ConfigKeyedProcessFunction
�ConfigKeySelector
�ConfigMapFunction
�ConfigProcessJoinFunction
�ConfigSerializableTimestampAssigner
ConfigSinkFunction�
Sink
等待更新…
环境配置项
# 应用程序名称
app.name=HelloWorld
# checkpoint触发时间间隔
env.checkpoint.interval=30
# checkpoint存储目录
env.checkpoint.directory=file:///Users/reality/Desktop/ck
# 失败重试次数
env.restart.attempts=3
# 失败重试间隔
env.restart.delay.interval=10
# 等待更新...