场景

有一个实时监控程序,需要根据动态的规则清洗数据,且可以接受规则定期更新。

Flink通常被用来处理流式数据,有着众多的应用场景,比方说实时的ETL、检测报警等业务场景。这些场景通常会涉及到规则的更新,比如对解析规则和报警规则进行更改后,流任务应能够实时感知到,并用新的规则继续检测,避免因为规则更改而重启任务造成的开销。一般来说流式任务的重启是比较重的。

概念介绍

广播变量

广播变量通常被运用到以下场景中:一个流中的一些数据需要被广播到所有的下游任务,被下游任务缓存在本地并用于处理另一个流上的所有传入数据。例如,一个低吞吐量的流包含了一组规则,我们希望根据这些规则对另一个流的所有数据进行检测。因此,广播变量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同时包含广播流和普通的数据流才可用(3)一个算子可以使用多个广播变量并用名称进行区分

数据流

1.日志实时流

  1. FlinkKafkaConsumer010<String> dataStream01 = KafkaConnect.getKafkaConsumer(params, "xxx");

2.Mysql数据流

其中 MonitorRule 为 Java Bean,用于接受规则数据。(规则数据为 1 条,则直接使用一个对象接受即刻。)

  1. DataStream<MonitorRule> monitorRuleDataStream = env.addSource(new MonitorRuleSource());

通过上面的代码可以看出,这里使用了自定义的 Source。自定义的Source,继承RichSourceFunction,重写函数。在open函数中初始化数据库连接池。在run函数中monitorRuleSourceContext.collect()出去,代码如下:

  1. public class monitorRuleSource extends RichSourceFunction<monitorRule> {
  2. private transient DruidDataSource dataSource = null;
  3. private volatile boolean isRunning = true;
  4. @Override
  5. public void open(Configuration parameters) throws Exception {
  6. PropertiesUtils propertiesUtils = new PropertiesUtils();
  7. String configPath = propertiesUtils.getConfigPath("application.properties");
  8. Properties p = propertiesUtils.getProperties(configPath,"sink.properties");
  9. //Mysql数据连接池
  10. dataSource = MysqlDataSource.getMysqlSource(p);
  11. super.open(parameters);
  12. }
  13. @Override
  14. public void run(SourceContext<monitorRule> monitorRuleSourceContext){
  15. try{
  16. while (isRunning){
  17. monitorRule monitorRule = new monitorRule();
  18. String sql = "select max_game_round,bug_game_round,bug_times,kill_times,bug_stop_start_game_round from xxx";
  19. Connection connection = dataSource.getConnection();
  20. PreparedStatement preparedStatement = connection.prepareStatement(sql);
  21. ResultSet resultSet = preparedStatement.executeQuery();
  22. while (resultSet.next()){
  23. monitorRule.setMaxGameRound(resultSet.getInt("max_game_round"));
  24. monitorRule.setBugGameRound(resultSet.getInt("bug_game_round"));
  25. monitorRule.setBugTimes(resultSet.getInt("bug_times"));
  26. monitorRule.setKillTimes(resultSet.getInt("kill_times"));
  27. monitorRule.setBugStopStartGameRound(resultSet.getInt("bug_stop_start_game_round"));
  28. }
  29. // 返回数据
  30. monitorRuleSourceContext.collect(monitorRule);
  31. //每隔一个小时执行一次查询
  32. Thread.sleep(1000*3600);
  33. }
  34. }catch (Exception e){
  35. System.out.println("Mysql data update error.."+e.getMessage());
  36. }
  37. }
  38. @Override
  39. public void cancel() {
  40. isRunning = false;
  41. }
  42. }

广播

参考文章

https://zhuanlan.zhihu.com/p/105304256?utm_source=ZHShareTargetIDMore