场景
有一个实时监控程序,需要根据动态的规则清洗数据,且可以接受规则定期更新。
Flink通常被用来处理流式数据,有着众多的应用场景,比方说实时的ETL、检测报警等业务场景。这些场景通常会涉及到规则的更新,比如对解析规则和报警规则进行更改后,流任务应能够实时感知到,并用新的规则继续检测,避免因为规则更改而重启任务造成的开销。一般来说流式任务的重启是比较重的。
概念介绍
广播变量
广播变量通常被运用到以下场景中:一个流中的一些数据需要被广播到所有的下游任务,被下游任务缓存在本地并用于处理另一个流上的所有传入数据。例如,一个低吞吐量的流包含了一组规则,我们希望根据这些规则对另一个流的所有数据进行检测。因此,广播变量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同时包含广播流和普通的数据流才可用(3)一个算子可以使用多个广播变量并用名称进行区分
数据流
1.日志实时流
FlinkKafkaConsumer010<String> dataStream01 = KafkaConnect.getKafkaConsumer(params, "xxx");
2.Mysql数据流
其中 MonitorRule 为 Java Bean,用于接受规则数据。(规则数据为 1 条,则直接使用一个对象接受即刻。)
DataStream<MonitorRule> monitorRuleDataStream = env.addSource(new MonitorRuleSource());
通过上面的代码可以看出,这里使用了自定义的 Source。自定义的Source,继承RichSourceFunction,重写函数。在open函数中初始化数据库连接池。在run函数中monitorRuleSourceContext.collect()出去,代码如下:
public class monitorRuleSource extends RichSourceFunction<monitorRule> {
private transient DruidDataSource dataSource = null;
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
PropertiesUtils propertiesUtils = new PropertiesUtils();
String configPath = propertiesUtils.getConfigPath("application.properties");
Properties p = propertiesUtils.getProperties(configPath,"sink.properties");
//Mysql数据连接池
dataSource = MysqlDataSource.getMysqlSource(p);
super.open(parameters);
}
@Override
public void run(SourceContext<monitorRule> monitorRuleSourceContext){
try{
while (isRunning){
monitorRule monitorRule = new monitorRule();
String sql = "select max_game_round,bug_game_round,bug_times,kill_times,bug_stop_start_game_round from xxx";
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
monitorRule.setMaxGameRound(resultSet.getInt("max_game_round"));
monitorRule.setBugGameRound(resultSet.getInt("bug_game_round"));
monitorRule.setBugTimes(resultSet.getInt("bug_times"));
monitorRule.setKillTimes(resultSet.getInt("kill_times"));
monitorRule.setBugStopStartGameRound(resultSet.getInt("bug_stop_start_game_round"));
}
// 返回数据
monitorRuleSourceContext.collect(monitorRule);
//每隔一个小时执行一次查询
Thread.sleep(1000*3600);
}
}catch (Exception e){
System.out.println("Mysql data update error.."+e.getMessage());
}
}
@Override
public void cancel() {
isRunning = false;
}
}
广播
参考文章
https://zhuanlan.zhihu.com/p/105304256?utm_source=ZHShareTargetIDMore