更新记录

时间 更新内容 更新人
2022-10-31 文档创建 付飞
2022-11-01 增加默认mainArgsConfig配置类解析 付飞
2022-11-02 1. 配置项必要与非必要参数识别,并增加非必要默认值
2. 增加Jdbc、Print、Kafka三种Sink组件支持
3. 增加通过main参数Flow构建流程定义并启动相应应用
4. 其它细节调整
付飞
2022-11-07 1. 支持kafka source序列化定义及类型支持
2. 增加shareStreamName属性定义
付飞
2022-11-09 1. 增加merge…算子优化支持
2. 更新部分算子处理逻辑及部分bug
付飞
2022-11-10 增加脚手架编排样例(持续更新) 付飞
2022-11-15 增加JsonMapper增加算子(算子转换规则将持续更新) 付飞
2022-12-19 增加transform、sink原生代码解析支持 付飞
2023-01-04 增加checkpoint是否开启配置
增加kafka source checkpoint 提交 offset 配置
付飞
2023-01-16 增加任务全局并行度设置
增加本地调试开启webui
付飞

Hello World

hello world案例中,我们将消费kafka数据(sink),并计算每条数据的长度(transform),而后打印至控制台(sink)。

1. 引入脚本架依赖

  1. <dependency>
  2. <groupId>com.zxelec</groupId>
  3. <artifactId>flink-scaffold</artifactId>
  4. <version>1.1</version>
  5. </dependency>

2. 定义启动类

  1. package com.zxelec.demo;
  2. import com.zxelec.demo.process.SizeMap;
  3. import com.zxelec.scaffold.annotation.Flow;
  4. import com.zxelec.scaffold.annotation.Sink;
  5. import com.zxelec.scaffold.annotation.Source;
  6. import com.zxelec.scaffold.annotation.Stream;
  7. import com.zxelec.scaffold.annotation.Transform;
  8. import com.zxelec.scaffold.app.Application;
  9. import org.apache.flink.connector.kafka.source.KafkaSource;
  10. import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
  11. @Flow(
  12. streams = {
  13. @Stream(
  14. sources = {
  15. @Source(
  16. sourceFunction = KafkaSource.class,
  17. configPrefix = "demo"
  18. )
  19. },
  20. transforms = {
  21. @Transform(
  22. transformFunction = SizeMap.class
  23. )
  24. },
  25. sinks = {
  26. @Sink(
  27. sinkFunction = PrintSinkFunction.class
  28. )
  29. }
  30. )
  31. }
  32. )
  33. public class HelloWorldApplication extends Application {
  34. }

SizeMap.java

  1. package com.zxelec.demo.process;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. public class SizeMap implements MapFunction<String, Integer> {
  4. @Override
  5. public Integer map(String json) throws Exception {
  6. return json.length();
  7. }
  8. }

3. 定义配置文件

  1. # 应用名称
  2. app.name=HelloWorld
  3. # 基础环境配置
  4. env.checkpoint.enable=true
  5. env.checkpoint.interval=30
  6. env.checkpoint.directory=file:///Users/felix/Desktop/ck
  7. env.restart.attempts=3
  8. env.restart.delay.interval=10
  9. # kafka
  10. demo.name=car_stream
  11. demo.bootstrap.servers=192.168.1.249:9092
  12. demo.topic=car_stream
  13. demo.group.id=demo_group_01
  14. demo.auto.offset.reset=earliest
  15. demo.enable.auto.commit=false

4. 启动参数

  1. --MainClass com.zxelec.demo.HelloWorldApplication --ConfigPath ${application.properties文件路径}

完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!

注解标签

@Flow

  1. @Flow(
  2. mainArgsConfig = <? extends MainArgsConfig.class>,
  3. streams = {
  4. @Stream(),
  5. ...
  6. }
  7. )
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig(可选参数) 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下: java /** * @author: Felix * @description: 解析main args传参 * @date: 15:45 2022/10/25 * @version: 1.0 */ public interface MainArgsConfig { /** * 解析配置项 * @param args * @return */ HashMap<String, String> process(String mainArgs); } 备注:此版本脚手架已提供默认mainArgs参数解析类,可自动解析单层kv结构JSON格式数据,将其封闭入配置容器ParameterTool中。使用时无需显示增加 mainArgsConfig = MainArgsConfig.class 类似定义。
  1. {
  2. "name": "zhangsan",
  3. "age": 18
  4. }

streams(必需参数)

用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。

@Stream

  1. @Stream(
  2. sources = {
  3. @Source(),
  4. ...
  5. },
  6. transforms = {
  7. @Transform(),
  8. ...
  9. },
  10. sinks = {
  11. @Sink(),
  12. ...
  13. }
  14. )
@Stream用于定义一个Flink流处理节点 #### sources(必需参数) 用于定义一个或多个数据源@Source()

transforms(可选参数)

用于定义一个或多个数据转换算子@Transform()

sinks(可选参数)

用于定义一个或多个输出源@Sink()

@Source

  1. @Source(
  2. sourceFunction = <? extends sinkFunction>,
  3. configPrefix = "",
  4. streamName = "",
  5. shareStreamName = ""
  6. )
@Source用于定义一个具体的数据源 #### sourceFunction(可选参数) 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix(可选参数) 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName(可选参数) 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名) #### shareStreamName(可选参数) 共享数据流标识名,当其它流处理节点需要使用时定义。

@Transform

  1. @Transform(
  2. transformFunction = <? extends Function>,
  3. shareStreamNames = {},
  4. isEnable = {}
  5. )

@Transform用于定义一个转换算子。当定义多个数据源时,为了区分同一算子不同数据源不同的业务处理,请定义数据源时增加configPrefix属性,定义转换为配置增加型算子,及可区别不同数据源进行实例化。(可查看文档案例部分)

transformFunction(必需参数)

转换算子class类型。(见转换算子章节)

shareStreamNames(可选参数)

共享数据流标识名,当其它流处理节点需要使用时定义。

isEnable(可选参数)

在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)

@Sink

  1. @Sink(
  2. sinkFunction = <? extends SinkFunction>,
  3. configPrefix = "",
  4. )
@Sink用于定义一个具体的输出源。

sinkFunction(必需参数)

获取所需输出源class类型。

configPrefix(可选参数)

用于输出源实例中区分获取需要的配置项。

启动命令

注解方式应用启动

RestApi启动时:entryClass为com.zxelec.scaffold.app.Application

—MainClass {应用启动类全类名} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]

�样例:

—MainClass com.zxelec.demo.HelloWorldApplication —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}

参数方式应用启动

RestApi启动时:entryClass为com.zxelec.scaffold.app.MainArgsApplication

—Flow {流程JSON} [—ConfigPath {配置文件绝对路径}] [—MainArgs {main参数}]

�样例:

—Flow {…} —ConfigPath /Users/felix/config/application.properties —MainArgs {\”name\”:”felix”}

流程JSON格式规则样例为:

  1. {
  2. "mainArgsConfig": "com.zxelec.relation.config.LoadMainArgsConfig"
  3. "streams": [
  4. {
  5. "sources": [
  6. {
  7. "sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource",
  8. "configPrefix": "stream1"
  9. },
  10. {
  11. "sourceFunction": "org.apache.flink.connector.kafka.source.KafkaSource",
  12. "configPrefix": "stream2"
  13. }
  14. ],
  15. "transforms": [
  16. {
  17. "transformFunction": "com.zxelec.relation.component.DataStandardMap"
  18. },
  19. {
  20. "transformFunction": "com.zxelec.relation.component.DataStandardFilter"
  21. }
  22. ],
  23. "sinks": [
  24. {
  25. "sinkFunction": "com.zxelec.relation.component.RelationSink",
  26. "configPrefix": "relation"
  27. }
  28. ]
  29. },
  30. {
  31. ...
  32. }
  33. ]
  34. }

脚本架组件

Source

Kafka

  1. sourceFunction = KafkaSource.class
  2. /**
  3. * [{configPrefix}.]kafka.name=car_stream // 必需
  4. * [{configPrefix}.]kafka.bootstrap.servers=192.168.1.249:9092 // 必需
  5. * [{configPrefix}.]kafka.topic=car_stream // 必需
  6. * [{configPrefix}.]kafka.group.id=demo01 // 必需
  7. * [{configPrefix}.]kafka.auto.offset.reset=earliest // 必需
  8. * [{configPrefix}.]kafka.enable.auto.commit=false // 必需
  9. * [{configPrefix}.]kafka.request.timeout.ms=10000 // 默认值:60000
  10. * [{configPrefix}.]kafka.session.timeout.ms=10000 // 默认值:40000
  11. * [{configPrefix}.]kafka.commit.offsets.on.checkpoint=false // 默认值false
  12. * --- kafka.deserialization.type start ---
  13. * kafka.deserialization.type: simple/KVTimestamp
  14. *
  15. * simple: 获取kafka value 封装为 String 类型数据
  16. * KVTimestamp: 获取kafka key/value/timestamp,并按顺序封装为 Tuple3<String, String, Long> 类型数据
  17. *
  18. * [{configPrefix}.]kafka.deserialization.type=simple // 默认值:simple
  19. * --- kafka.deserialization.type end ---
  20. * 等待更新...
  21. */

Transform

原生算子

Flink原生算子:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/

脚手架算子:

  1. /**
  2. * 组合keyBy及keyedProcess
  3. * @param <K>
  4. * @param <I>
  5. * @param <O>
  6. */
  7. public abstract class MergeKeyedProcessFunction<K, I, O> extends KeyedProcessFunction<K, I, O> {
  8. public abstract KeySelector<I, K> keyBy();
  9. }

Config增加型算子

config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:

  1. package com.zxelec.demo.process;
  2. import com.zxelec.scaffold.component.ConfigMapFunction;
  3. import org.apache.flink.api.java.utils.ParameterTool;
  4. public class DemoConfigMap extends ConfigMapFunction<String, String> {
  5. public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {
  6. super(parameterTool, configPrefix);
  7. // parameterTool为应用程序配置容器
  8. // configPrefix为算子处理的输入流定义的属性
  9. }
  10. @Override
  11. public String map(String o) throws Exception {
  12. return null;
  13. }
  14. }

目前脚手架已支持的增加型算子有:

ConfigFilterFunction

�ConfigFlatMapFunction

�ConfigKeyedCoProcessFunction

�ConfigKeyedProcessFunction

�ConfigKeySelector

�ConfigMapFunction

�ConfigProcessJoinFunction

�ConfigSerializableTimestampAssigner

ConfigSinkFunction�

ConfigMergeKeyedProcessFunction

JsonMapper

JsonMapper在ConfigMapFunction算子的基础上,以提供json映射文件或映射参数,自动完成json源字符串至json目标字符串的转换处理。

  1. @Flow(
  2. streams = {
  3. @Stream(
  4. sources = {
  5. @Source(
  6. sourceFunction = KafkaSource.class
  7. )
  8. },
  9. transforms = {
  10. @Transform(
  11. transformFunction = JsonMapper.class
  12. )
  13. },
  14. sinks = {
  15. @Sink(
  16. sinkFunction = PrintSinkFunction.class
  17. )
  18. }
  19. ),
  20. }
  21. )
  22. public class JsonApplication extends Application {
  23. }
  1. app.name=HelloWorld
  2. # 基础环境配置
  3. env.checkpoint.interval=30
  4. env.checkpoint.directory=file:///Users/reality/Desktop/ck
  5. env.restart.attempts=3
  6. env.restart.delay.interval=10
  7. # kafka
  8. kafka.name=imsi01
  9. kafka.bootstrap.servers=192.168.1.249:9092
  10. kafka.topic=imsi01
  11. kafka.group.id=demo00
  12. kafka.auto.offset.reset=earliest
  13. kafka.enable.auto.commit=false
  14. # 指定映射文件路径 (优先级低)
  15. json.mapper.file=/Users/reality/zx/project/flink-bootstrap/flink-bootstrap-component/flink-bootstrap-component-demo/imsi.json
  16. # 指定映射内容 (优先级高)
  17. json.mapper.param=

映射项说明: targetName:输出字段名 jsonPath:jsonPath语法提取输出值 transformType:输出值转换类型 transformRule: 转换规则
  1. [
  2. {
  3. "targetName": "objectId",
  4. "jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateNo,$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PlateColor",
  5. // join转换:提取多项源数据字段值,以transformRule.join字符串进行拼接。
  6. // 注意:多项源字段名以jsonPath语法用逗号拼接赋值于jsonPath字段
  7. "transformType": "join",
  8. "transformRule": {
  9. "join": "_"
  10. }
  11. },
  12. {
  13. "targetName": "trailTime",
  14. "jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTimer",
  15. // 将源数据时间字符串字段值,转换为其它日期格式字符串
  16. "transformType": "dateTime",
  17. "transformRule": {
  18. "sourceDateFormat": "yyyyMMddHHmmssSSS",
  19. "targetDateFormat": "yyyy-MM-dd HH:mm:ss"
  20. }
  21. },
  22. {
  23. // 提取源数据值,不做转换处理
  24. "targetName": "enterTime",
  25. "jsonPath": "$.Content.MotorVehicleObjectList.MotorVehicleObject[0].PassTime"
  26. },
  27. {
  28. "targetName": "trailRemark",
  29. // 新增字段并赋值transformRule.defaultValue
  30. "transformType": "default",
  31. "transformRule": {
  32. "defaultValue": "过车数据"
  33. }
  34. },
  35. {
  36. "targetName": "processTime",
  37. "jsonPath": "$.shotTime",
  38. // 将源数据时间戳时间格式化至指定日期格式字符串
  39. "transformType": "timeStamp",
  40. "transformRule": {
  41. "dateformat": "yyyy-MM-dd HH:mm:ss"
  42. }
  43. }
  44. // 等待更新...
  45. ]

NativeTransform

nativeTransform中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�

  1. public class RelationNativeTransform implements NativeTransform {
  2. /**
  3. * 业务处理
  4. * @param dataStreamList Tuple<流,流的配置前缀>
  5. * @param config 配置容器
  6. * @return 返回形式与 @param dataStreamList 相同,请保存流及配置顺序。否则会造成后续业务处理异常
  7. */
  8. @Override
  9. public List<Tuple2<DataStream, String>> process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {
  10. // TODO... 业务书写
  11. return null;
  12. }
  13. }

Sink

JdbcSink

  1. 实现JdbcSink接口并根据自己业务完成接口方法。
  1. package com.zxelec.demo.process;
  2. import com.zxelec.scaffold.component.sink.JdbcSink;
  3. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
  4. /**
  5. * @author: Felix
  6. * @description: DemoJdbcSink
  7. * @date: 16:02 2022/11/2
  8. * @version: 1.0
  9. */
  10. public class DemoJdbcSink implements JdbcSink<Relation> {
  11. @Override
  12. public String prepareStatement() {
  13. return "INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
  14. }
  15. @Override
  16. public JdbcStatementBuilder<Relation> setParameters() {
  17. return (statement, relation) -> {
  18. statement.setString(1, relation.getLayoutObjectId());
  19. statement.setString(2, relation.getAccompanyObjectId());
  20. statement.setLong(3, relation.getLayoutEventTime()/1000);
  21. statement.setLong(4, relation.getAccompanyEventTime()/1000);
  22. statement.setString(5, relation.getLayoutAddressId());
  23. statement.setString(6, relation.getAccompanyAddressId());
  24. statement.setString(7, relation.getLayoutObjectType());
  25. statement.setString(8, relation.getAccompanyObjectType());
  26. statement.setString(9, relation.getGid());
  27. statement.setString(10, relation.getLayoutRemark());
  28. statement.setString(11, relation.getAccompanyRemark());
  29. statement.setLong(12, System.currentTimeMillis()/1000);
  30. };
  31. }
  32. }
  1. 增加数据库相关配置项
  1. [{configPrefix}.]jdbc.url=jdbc:clickhouse://192.168.1.101:8123/demo
  2. [{configPrefix}.]jdbc.username=zhangsan
  3. [{configPrefix}.]jdbc.password=123456
  4. [{configPrefix}.]jdbc.driver.name=com.clickhouse.jdbc.ClickHouseDriver
  5. # jdbc.data.batch.size 默认值:400
  6. [{configPrefix}.]jdbc.data.batch.size=400
  7. # jdbc.data.batch.interval.ms 默认值200
  8. [{configPrefix}.]jdbc.data.batch.interval.ms=200
  1. 应用配置
  1. sinks = {
  2. @Sink(
  3. sinkFunction = DemoJdbcSink.class
  4. )
  5. }

PrintSinkFunction

  1. 应用配置:
  1. sinks = {
  2. @Sink(
  3. sinkFunction = PrintSinkFunction.class
  4. )
  5. }

�KafkaSink

  1. 增加kafka sink相关配置
  1. [{configPrefix}.]kafka.sink.brokers=192.168.1.1
  2. [{configPrefix}.]kafka.sink.topic=demo_topic
  3. # exactly-once/at-least-once 默认值:at-least-once
  4. [{configPrefix}.]kafka.sink.mode=at-least-once
  1. 应用配置
  1. sinks = {
  2. @Sink(
  3. sinkFunction = KafkaSink.class
  4. )
  5. }

NativeSink

nativeSink中可支持书写flink原生代码,以补充脚手架最大的业务灵活性。使用样例如下:�

  1. public class RelationNativeSink implements NativeSink {
  2. /**
  3. * 业务处理
  4. * @param dataStreamList dataStreamList Tuple<流,流的配置前缀>
  5. * @param config 配置容器
  6. */
  7. @Override
  8. public void process(List<Tuple2<DataStream, String>> dataStreamList, ParameterTool config) {
  9. // TODO... 业务书写
  10. }
  11. }

环境配置项

  1. # 应用程序名称(默认值:defaultStreamApp)
  2. app.name=HelloWorld
  3. # -------- parallelism config start --------
  4. # 任务全局并行度
  5. env.parallelism=1(默认值:1
  6. # -------- parallelism config end --------
  7. # -------- 本地调试开启webui config start --------
  8. env.local.web.ui.enable=true(默认值:false
  9. env.local.web.ui.port=8081(默认值:8081
  10. # 配置后maven依赖增加以下项即可,访问路径 localhost:8081
  11. # <dependency>
  12. # <groupId>org.apache.flink</groupId>
  13. # <artifactId>flink-runtime-web_2.12</artifactId>
  14. # <version>1.14.6</version>
  15. # </dependency>
  16. # -------- 本地调试开启webui config end --------
  17. # -------- checkpoint config start --------
  18. # 是否开启checkpoint(true、false。默认值:false)
  19. env.checkpoint.enable=true
  20. # 模式(exactly_once、at_least_once。默认值:at_least_once)
  21. env.checkpoint.mode=exactly_once
  22. # checkpoint触发时间间隔(单位:毫秒。默认值:600000)
  23. env.checkpoint.interval=60000
  24. # checkpoint存储目录(必需)
  25. env.checkpoint.directory=file:///Users/reality/Desktop/ck
  26. # 失败重试次数(默认值:3)
  27. env.restart.attempts=5
  28. # 失败重试间隔(单位:秒。默认值:10)
  29. env.restart.delay.interval=30
  30. # -------- checkpoint config end --------
  31. # 等待更新...

脚手架编程编排样例

样例1:简单的单流处理

  1. @Flow(
  2. streams = {
  3. @Stream(
  4. sources = {
  5. @Source(
  6. sourceFunction = KafkaSource.class
  7. )
  8. },
  9. transforms = {
  10. @Transform(
  11. transformFunction = DemoMap.class
  12. )
  13. },
  14. sinks = {
  15. @Sink(
  16. sinkFunction = PrintSinkFunction.class
  17. )
  18. }
  19. ),
  20. }
  21. )
  22. public class Demo1Application extends Application {
  23. }
  1. # kafka source config
  2. kafka.name=demo01
  3. kafka.bootstrap.servers=192.168.1.1:9092
  4. kafka.topic=demo01
  5. kafka.group.id=demo01
  6. kafka.auto.offset.reset=earliest
  7. kafka.enable.auto.commit=false
  1. public class DemoMap implements MapFunction<String, Integer> {
  2. @Override
  3. public Integer map(String value) throws Exception {
  4. // 业务处理...
  5. return value.length();
  6. }
  7. }

样例2:自定义解析Main Args,单流及其过程流转换处理

  1. @Flow(
  2. mainArgsConfig = CsvMainArgs.class,
  3. streams = {
  4. @Stream(
  5. sources = {
  6. @Source(
  7. sourceFunction = KafkaSource.class
  8. )
  9. },
  10. transforms = {
  11. @Transform(
  12. transformFunction = DemoMap.class,
  13. // 定义转换处理之后的流标识,作为其它任务数据源使用
  14. shareStreamNames = {"mapStream"}
  15. )
  16. },
  17. sinks = {
  18. @Sink(
  19. sinkFunction = PrintSinkFunction.class
  20. )
  21. }
  22. ),
  23. @Stream(
  24. sources = {
  25. @Source(
  26. // 引用之前任务某阶段的流标识,做为此任务的数据源
  27. streamName = "mapStream"
  28. )
  29. },
  30. transforms = {
  31. @Transform(
  32. transformFunction = SizeMap.class
  33. )
  34. },
  35. sinks = {
  36. @Sink(
  37. sinkFunction = PrintSinkFunction.class
  38. )
  39. }
  40. ),
  41. }
  42. )
  43. public class Demo2Application extends Application {
  44. }
  1. # kafka source config
  2. kafka.name=demo01
  3. kafka.bootstrap.servers=192.168.1.1:9092
  4. kafka.topic=demo01
  5. kafka.group.id=demo01
  6. kafka.auto.offset.reset=earliest
  7. kafka.enable.auto.commit=false
  1. public class CsvMainArgs implements MainArgsConfig {
  2. @Override
  3. public HashMap<String, String> process(String args) {
  4. // TODO 解析逻辑...
  5. return null;
  6. }
  7. }
  1. public class DemoMap implements MapFunction<String, Integer> {
  2. @Override
  3. public Integer map(String value) throws Exception {
  4. // 业务处理...
  5. return value.length();
  6. }
  7. }
  1. public class SizeMap implements MapFunction<String, Integer> {
  2. @Override
  3. public Integer map(String json) throws Exception {
  4. return json.length();
  5. }
  6. }

样例3:双流转换,并连接处理

  1. @Flow(
  2. mainArgsConfig = CsvMainArgs.class,
  3. streams = {
  4. // 任务1 获取订单流数据,转换并对流分组
  5. @Stream(
  6. sources = {
  7. @Source(
  8. sourceFunction = KafkaSource.class,
  9. configPrefix = "order"
  10. )
  11. },
  12. transforms = {
  13. @Transform(
  14. transformFunction = OrderInfo.class
  15. ),
  16. @Transform(
  17. transformFunction = OrderKey.class,
  18. // 定义此转换结果流标识,供之后任务作为数据源使用
  19. shareStreamNames = "orderKeyStream"
  20. )
  21. }
  22. ),
  23. // 任务2 获取用户流数据,转换并对流分组
  24. @Stream(
  25. sources = {
  26. @Source(
  27. sourceFunction = KafkaSource.class,
  28. configPrefix = "admin"
  29. )
  30. },
  31. transforms = {
  32. @Transform(
  33. transformFunction = AdminInfo.class
  34. ),
  35. @Transform(
  36. transformFunction = AdminKey.class,
  37. // 定义此转换结果流标识,供之后任务作为数据源使用
  38. shareStreamNames = "adminKeyStream"
  39. )
  40. }
  41. ),
  42. // 任务3 获取分组后的订单流及用户流,进行相同分组数据业务处理
  43. @Stream(
  44. sources = {
  45. @Source(
  46. streamName = "orderKeyStream"
  47. ),
  48. @Source(
  49. streamName = "adminKeyStream"
  50. )
  51. },
  52. transforms = {
  53. @Transform(
  54. transformFunction = OrderAndAdminConnect.class
  55. )
  56. },
  57. sinks = {
  58. @Sink(
  59. sinkFunction = PrintSinkFunction.class
  60. )
  61. }
  62. ),
  63. }
  64. )
  65. public class Demo3Application extends Application {
  66. }
  1. # order kafka source config
  2. kafka.name=order
  3. kafka.bootstrap.servers=192.168.1.1:9092
  4. kafka.topic=order_topic
  5. kafka.group.id=order01
  6. kafka.auto.offset.reset=earliest
  7. kafka.enable.auto.commit=false
  8. # admin kafka source config
  9. admin.kafka.name=admin
  10. admin.kafka.bootstrap.servers=192.168.1.1:9092
  11. admin.kafka.topic=admin_topic
  12. admin.kafka.group.id=admin01
  13. admin.kafka.auto.offset.reset=earliest
  14. admin.kafka.enable.auto.commit=false
  1. public class OrderInfo implements MapFunction<String, String> {
  2. @Override
  3. public String map(String order) throws Exception {
  4. // 业务处理...
  5. return order;
  6. }
  7. }
  1. public class OrderKey implements KeySelector<String, String> {
  2. @Override
  3. public String getKey(String order) throws Exception {
  4. // key 获取处理...
  5. return order;
  6. }
  7. }
  1. public class AdminInfo implements MapFunction<String, String> {
  2. @Override
  3. public String map(String admin) throws Exception {
  4. // 业务处理...
  5. return admin;
  6. }
  7. }
  1. public class AdminKey implements KeySelector<String, String> {
  2. @Override
  3. public String getKey(String admin) throws Exception {
  4. // key 获取处理...
  5. return admin;
  6. }
  7. }
  1. public class OrderAndAdminConnect extends KeyedCoProcessFunction<String, String, String, String> {
  2. @Override
  3. public void processElement1(String s, Context context, Collector<String> collector) throws Exception {
  4. // 订单流数据...
  5. }
  6. @Override
  7. public void processElement2(String s, Context context, Collector<String> collector) throws Exception {
  8. // 用户流数据...
  9. }
  10. }

样例4:多流转换,并输出至同一数据源