场景
有一个实时监控程序,需要根据动态的规则清洗数据,且可以接受规则定期更新。
Flink通常被用来处理流式数据,有着众多的应用场景,比方说实时的ETL、检测报警等业务场景。这些场景通常会涉及到规则的更新,比如对解析规则和报警规则进行更改后,流任务应能够实时感知到,并用新的规则继续检测,避免因为规则更改而重启任务造成的开销。一般来说流式任务的重启是比较重的。
概念介绍
广播变量
广播变量通常被运用到以下场景中:一个流中的一些数据需要被广播到所有的下游任务,被下游任务缓存在本地并用于处理另一个流上的所有传入数据。例如,一个低吞吐量的流包含了一组规则,我们希望根据这些规则对另一个流的所有数据进行检测。因此,广播变量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同时包含广播流和普通的数据流才可用(3)一个算子可以使用多个广播变量并用名称进行区分
数据流
1.日志实时流
FlinkKafkaConsumer010<String> dataStream01 = KafkaConnect.getKafkaConsumer(params, "xxx");
2.Mysql数据流
其中 MonitorRule 为 Java Bean,用于接受规则数据。(规则数据为 1 条,则直接使用一个对象接受即刻。)
DataStream<MonitorRule> monitorRuleDataStream = env.addSource(new MonitorRuleSource());
通过上面的代码可以看出,这里使用了自定义的 Source。自定义的Source,继承RichSourceFunction,重写函数。在open函数中初始化数据库连接池。在run函数中monitorRuleSourceContext.collect()出去,代码如下:
public class monitorRuleSource extends RichSourceFunction<monitorRule> {private transient DruidDataSource dataSource = null;private volatile boolean isRunning = true;@Overridepublic void open(Configuration parameters) throws Exception {PropertiesUtils propertiesUtils = new PropertiesUtils();String configPath = propertiesUtils.getConfigPath("application.properties");Properties p = propertiesUtils.getProperties(configPath,"sink.properties");//Mysql数据连接池dataSource = MysqlDataSource.getMysqlSource(p);super.open(parameters);}@Overridepublic void run(SourceContext<monitorRule> monitorRuleSourceContext){try{while (isRunning){monitorRule monitorRule = new monitorRule();String sql = "select max_game_round,bug_game_round,bug_times,kill_times,bug_stop_start_game_round from xxx";Connection connection = dataSource.getConnection();PreparedStatement preparedStatement = connection.prepareStatement(sql);ResultSet resultSet = preparedStatement.executeQuery();while (resultSet.next()){monitorRule.setMaxGameRound(resultSet.getInt("max_game_round"));monitorRule.setBugGameRound(resultSet.getInt("bug_game_round"));monitorRule.setBugTimes(resultSet.getInt("bug_times"));monitorRule.setKillTimes(resultSet.getInt("kill_times"));monitorRule.setBugStopStartGameRound(resultSet.getInt("bug_stop_start_game_round"));}// 返回数据monitorRuleSourceContext.collect(monitorRule);//每隔一个小时执行一次查询Thread.sleep(1000*3600);}}catch (Exception e){System.out.println("Mysql data update error.."+e.getMessage());}}@Overridepublic void cancel() {isRunning = false;}}
广播
参考文章
https://zhuanlan.zhihu.com/p/105304256?utm_source=ZHShareTargetIDMore
