更新记录

时间 更新内容 更新人
2022-10-28 文档创建 付飞

Hello World

1. 引入脚本架依赖

  1. <dependency>
  2. <groupId>com.zxelec</groupId>
  3. <artifactId>flink-scaffold</artifactId>
  4. <version>1.0</version>
  5. </dependency>

2. 定义启动类

  1. package com.zxelec.demo;
  2. import com.zxelec.demo.process.SizeMap;
  3. import com.zxelec.scaffold.annotation.Flow;
  4. import com.zxelec.scaffold.annotation.Sink;
  5. import com.zxelec.scaffold.annotation.Source;
  6. import com.zxelec.scaffold.annotation.Stream;
  7. import com.zxelec.scaffold.annotation.Transform;
  8. import com.zxelec.scaffold.app.Application;
  9. import org.apache.flink.connector.kafka.source.KafkaSource;
  10. import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
  11. @Flow(
  12. streams = {
  13. @Stream(
  14. sources = {
  15. @Source(
  16. sourceFunction = KafkaSource.class,
  17. configPrefix = "demo"
  18. )
  19. },
  20. transforms = {
  21. @Transform(
  22. transformFunction = SizeMap.class
  23. )
  24. },
  25. sinks = {
  26. @Sink(
  27. sinkFunction = PrintSinkFunction.class
  28. )
  29. }
  30. )
  31. }
  32. )
  33. public class HelloWorldApplication extends Application {
  34. }

SizeMap.java

  1. package com.zxelec.demo.process;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. public class SizeMap implements MapFunction<String, Integer> {
  4. @Override
  5. public Integer map(String json) throws Exception {
  6. return json.length();
  7. }
  8. }

3. 定义配置文件

  1. # 标识是否为必须及默认值
  2. app.name=HelloWorld
  3. # 基础环境配置
  4. env.checkpoint.interval=30
  5. env.checkpoint.directory=file:///Users/felix/Desktop/ck
  6. env.restart.attempts=3
  7. env.restart.delay.interval=10
  8. # kafka
  9. demo.name=car_stream
  10. demo.bootstrap.servers=192.168.1.249:9092
  11. demo.topic=car_stream
  12. demo.group.id=demo_group_01
  13. demo.auto.offset.reset=earliest
  14. demo.enable.auto.commit=false

4. 启动参数

  1. --MainClass com.zxelec.demo.HelloWorldApplication --configPath ${application.properties文件路径}

完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!

注解标签

@Flow

  1. @Flow(
  2. mainArgsConfig = MainArgsConfig.class,
  3. streams = {
  4. @Stream(),
  5. ...
  6. }
  7. )
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下: java /** * @author: Felix * @description: 解析main args传参 * @date: 15:45 2022/10/25 * @version: 1.0 */ public interface MainArgsConfig { /** * 解析配置项 * @param args * @return */ HashMap<String, String> process(String mainArgs); } #### streams 用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。 ### @Stream java @Stream( sources = { @Source(), ... }, transforms = { @Transform(), ... }, sinks = { @Sink(), ... } ) @Stream用于定义一个Flink流处理节点 #### sources 用于定义一个或多个数据源@Source()

transforms

用于定义一个或多个数据转换算子@Transform()

sinks

用于定义一个或多个输出源@Sink()

@Source

  1. @Source(
  2. sourceFunction = KafkaSource.class,
  3. configPrefix = "",
  4. streamName = ""
  5. )
  6. /**
  7. * KafkaSource为脚手架提供的数据源
  8. * 配置项为:
  9. * {configPrefix}.name=
  10. * {configPrefix}.bootstrap.servers=
  11. * {configPrefix}.topic=
  12. * {configPrefix}.group.id=
  13. * {configPrefix}.auto.offset.reset=
  14. * {configPrefix}.enable.auto.commit=
  15. *
  16. */
@Source用于定义一个具体的数据源 #### sourceFunction 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名)

@Transform

  1. @Transform(
  2. transformFunction = Map.class,
  3. configPrefix = {},
  4. streamNames = {},
  5. isEnable = {}
  6. )

@Transform用于定义一个转换算子。因一个转换算子可应用到多条数据源上,故其属性配置项常为数组形式,以便针对不同数据源提供准确的定义。

transformFunction

转换算子class类型。(见转换算子章节)

configPrefix

用于算子实例中获取需要的配置项。

streamNames

转换后的数据流标识名,当其它流处理节点需要使用时,可进行定义。

isEnable

在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)

@Sink

  1. @Sink(
  2. sinkFunction = PrintSinkFunction.class
  3. configPrefix = {}
  4. )
@Sink用于定义一个具体的输出源。

sinkFunction

获取所需输出源class类型。目前脚手架只提供了JDBC连接方式,继承ConfigSinkFunction

�并实现SinkFactory接口,使用样例如下。

  1. package com.zxelec.relation.component;
  2. import com.zxelec.relation.entity.Relation;
  3. import com.zxelec.scaffold.component.ConfigSinkFunction;
  4. import com.zxelec.scaffold.component.SinkFactory;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  7. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  8. import org.apache.flink.connector.jdbc.JdbcSink;
  9. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  10. /**
  11. * @author: Felix
  12. * @description: 关系散存存储
  13. * @date: 15:45 2022/9/27
  14. * @version: 1.0
  15. */
  16. public class RelationSink extends ConfigSinkFunction implements SinkFactory {
  17. private String ip;
  18. private String port;
  19. private String database;
  20. private String username;
  21. private String password;
  22. private Integer dataBatchSize;
  23. private Integer dataBatchIntervalMs;
  24. public RelationSink(ParameterTool parameterTool, String configPrefix) {
  25. super(parameterTool, configPrefix);
  26. this.ip = parameterTool.get("database.ip");
  27. this.port = parameterTool.get("database.port");
  28. this.database = parameterTool.get("database");
  29. this.username = parameterTool.get("database.user");
  30. this.password = parameterTool.get("database.password");
  31. this.dataBatchSize = parameterTool.getInt(configPrefix + ".data.batch.size");
  32. this.dataBatchIntervalMs = parameterTool.getInt(configPrefix + ".data.batch.interval.ms");
  33. }
  34. @Override
  35. public SinkFunction createSinkFunction() {
  36. SinkFunction<Relation> sink = JdbcSink.sink(
  37. "INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
  38. (statement, relation) -> {
  39. statement.setString(1, relation.getLayoutObjectId());
  40. statement.setString(2, relation.getAccompanyObjectId());
  41. statement.setLong(3, relation.getLayoutEventTime()/1000);
  42. statement.setLong(4, relation.getAccompanyEventTime()/1000);
  43. statement.setString(5, relation.getLayoutAddressId());
  44. statement.setString(6, relation.getAccompanyAddressId());
  45. statement.setString(7, relation.getLayoutObjectType());
  46. statement.setString(8, relation.getAccompanyObjectType());
  47. statement.setString(9, relation.getGid());
  48. statement.setString(10, relation.getLayoutRemark());
  49. statement.setString(11, relation.getAccompanyRemark());
  50. statement.setLong(12, System.currentTimeMillis()/1000);
  51. },
  52. JdbcExecutionOptions.builder()
  53. .withBatchSize(dataBatchSize)
  54. .withBatchIntervalMs(dataBatchIntervalMs)
  55. .withMaxRetries(5)
  56. .build(),
  57. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  58. .withUrl("jdbc:clickhouse://" + ip + ":" + port + "/" + database + "?autoReconnect=true&maxReconnects=5&initialTimeout=10&connectTimeout=30000&socketTimeout=30000")
  59. .withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
  60. .withUsername(username)
  61. .withPassword(password)
  62. .build()
  63. );
  64. return sink;
  65. }
  66. }

configPrefix

用于输出源实例中获取需要的配置项。

启动命令

—MainClass {应用启动类全类名} —configPath {配置文件绝对路径} —mainArgs {main参数}

�样例:

—MainClass com.zxelec.demo.HelloWorldApplication —configPath /Users/felix/config/application.properties —mainArgs {\”name\”:”felix”}

脚本架组件

Source

Kafka

  1. sourceFunction = KafkaSource.class
  2. /**
  3. * {configPrefix}.name=car_stream //kafka流名称,用于定义flink查看指标的标识名
  4. * {configPrefix}.bootstrap.servers=192.168.1.249:9092
  5. * {configPrefix}.topic=car_stream
  6. * {configPrefix}.group.id=demo01
  7. * {configPrefix}.auto.offset.reset=earliest
  8. * {configPrefix}.enable.auto.commit=false
  9. */

Transform

原生算子

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/

Config增加型算子

config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:

  1. package com.zxelec.demo.process;
  2. import com.zxelec.scaffold.component.ConfigMapFunction;
  3. import org.apache.flink.api.java.utils.ParameterTool;
  4. public class DemoConfigMap extends ConfigMapFunction<String, String> {
  5. public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {
  6. super(parameterTool, configPrefix);
  7. // parameterTool为应用程序配置容器
  8. // configPrefix为算子流程定义时的名称
  9. }
  10. @Override
  11. public String map(String o) throws Exception {
  12. return null;
  13. }
  14. }

目前脚手架已支持的增加型算子有:

ConfigFilterFunction

�ConfigFlatMapFunction

�ConfigKeyedCoProcessFunction

�ConfigKeyedProcessFunction

�ConfigKeySelector

�ConfigMapFunction

�ConfigProcessJoinFunction

�ConfigSerializableTimestampAssigner

ConfigSinkFunction�

Sink

等待更新…

环境配置项

  1. # 应用程序名称
  2. app.name=HelloWorld
  3. # checkpoint触发时间间隔
  4. env.checkpoint.interval=30
  5. # checkpoint存储目录
  6. env.checkpoint.directory=file:///Users/reality/Desktop/ck
  7. # 失败重试次数
  8. env.restart.attempts=3
  9. # 失败重试间隔
  10. env.restart.delay.interval=10
  11. # 等待更新...