背景

  1. 客户的业务系统老旧,无法为实时计算提供实时消息。如何保证mysql的插入或者更新的数据能实时发送到flink提供计算?

解决方案

image.png

在调研(百度)一番后,发现maxwell和canal可以实现监控mysql的binlog并发送到kafka。

  • 我们这里选择maxwell来监控binlog。
  • maxwell:https://github.com/zendesk/maxwell
  • maxwell是一个读取MySQL binlog并将行更新作为JSON写入Kafka,Kinesis或者其它流数据平台的应用程序。Maxwell的操作开销很低,只需要mysql和写入的地方。其常见应用包括ETL,缓存构建/到期,度量收集,搜索索引和服务间通信。Maxwell为您提供了事件订阅的一些好处,而无需重新构建整个平台。
  • 如上图,整体是:利用maxwell监控mysql的binlog将更新或者插入的数据以json格式发送的kafka的某个topic,flink程序消费该topic并计算实时指标,最后flink将计算结果sink到mysql。

实操过程

配置maxwell监控binlog

安装maxwell

  1. curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.22.0/maxwell-1.22.0.tar.gz \
  2. | tar zxvf -
  3. cd maxwell-1.22.0
  4. #or get the docker image:
  5. docker pull zendesk/maxwell
  6. #or on Mac OS X with homebrew installed:
  7. brew install maxwell

配置mysql

服务器配置: 确保配置了 server_id,并且打开了基于行的复制。

  1. $ vi my.cnf
  2. [mysqld]
  3. server_id=1
  4. log-bin=master
  5. binlog_format=row

或者在正在运行的服务上:

  1. mysql> set global binlog_format=ROW;
  2. mysql> set global binlog_row_image=FULL;

注意: binlog_format 是基于会话的属性。要完全转换,您需要关闭所有活动连接到基于行的复制。
权限: Maxwell需要数据库指定的schema_database指定的选项(默认 maxwell)的存储状态的权限。

  1. mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
  2. mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
  3. mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
  4. mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell'@'%';
  5. mysql> flush privileges;
  6. # 或者本地运行maxwell:
  7. mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'maxwell';
  8. mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
  9. mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell'@'localhost';

运行maxwell

输出结果到命令行

  1. $ cd MAXWELL_HOME
  2. $ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout

输出结果到kafka

启动kafka,并创建相应topic。

  1. $ cd MAXWELL_HOME
  2. $ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell

测试

  1. 首先在业务mysql中新建如下测试表: ``sql DROP TABLE IF EXISTSmaxwell_test; CREATE TABLEmaxwell_test(idint(11) DEFAULT NULL,namevarchar(255) CHARACTER SET latin1 DEFAULT NULL,address` varchar(255) CHARACTER SET latin1 DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

SET FOREIGN_KEY_CHECKS = 1;

  1. 2.启动zookeeperkafka:
  2. ```shell
  3. # 启动ZK
  4. $ cd $ZOOKEEPER_HOME
  5. $ bin/zkServer.sh start
  6. # 查看ZK是否正常启动
  7. $ bin/zkServer.sh status
  8. ZooKeeper JMX enabled by default
  9. Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
  10. Mode: standalone
  11. # ZK已经启动了单节点
  12. # 启动kafka
  13. $ cd KAFKA_HOME
  14. $ bin/kafka-server-start.sh config/server.properties
  15. # 查看kafka进程
  16. $ jps
  17. 22141 Kafka
  18. # 创建测试topic
  19. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic maxwell
  20. # 查看topic-list
  21. $ bin/kafka-topics.sh --list --zookeeper localhost:2181
  22. __consumer_offsets
  23. maxwell
  1. 启动maxwell

    1. # 启动maxwell往kafka发送数据
    2. $ cd MAXWELL_HOME
    3. $ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell
  2. 改动数据查看结果 ```shell

    启动一个kafka的消费者,用于观察maxwell的输出

    $ cd KAFKA_HOME $ bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic maxwell

向mysql测试表新增一条数据 id=1,name=’test’,address=’test’

kafka消费者接收到的数据如下:

{“database”:”guiyi_tset”,”table”:”maxwell_test”,”type”:”insert”,”ts”:1557453039,”xid”:81408,”commit”:true,”data”:{“id”:1,”name”:”test”,”address”:”test”}}

修改测试表数据

kafka消费者接收到的数据如下:

{“database”:”guiyi_tset”,”table”:”maxwell_test”,”type”:”update”,”ts”:1557453200,”xid”:81624,”commit”:true,”data”:{“id”:1,”name”:”test”,”address”:”hangzhou”},”old”:{“address”:”test”}} ```

至此maxwell监控mysql的binlog并发送到kafka的这部分都走通了。

编写flink程序

应用数据库建表

新建应用数据库表,用于flink sink计算结果。

  1. DROP TABLE IF EXISTS `bas_tech_charge_station`;
  2. CREATE TABLE `bas_tech_charge_station` (
  3. `StationID` int(255) NOT NULL COMMENT '充电站ID',
  4. `charge_num` float DEFAULT NULL COMMENT '充电量',
  5. `ctime` datetime DEFAULT NULL COMMENT '创建时间',
  6. `mtime` datetime DEFAULT NULL COMMENT '修改时间'
  7. ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
  8. SET FOREIGN_KEY_CHECKS = 1;

实体类

  1. package com.dtwave.binlog;
  2. /**
  3. * @author : 恋晨
  4. * Date : 2019/5/9 11:37 AM
  5. * 功能 :
  6. */
  7. public class bas_tech_charge_station {
  8. private int StationID;
  9. private float charge_num;
  10. private String ctime;
  11. private String mtime;
  12. public bas_tech_charge_station() {
  13. }
  14. public bas_tech_charge_station(int stationID, float charge_num, String ctime, String mtime) {
  15. StationID = stationID;
  16. this.charge_num = charge_num;
  17. this.ctime = ctime;
  18. this.mtime = mtime;
  19. }
  20. public int getStationID() {
  21. return StationID;
  22. }
  23. public void setStationID(int stationID) {
  24. StationID = stationID;
  25. }
  26. public float getCharge_num() {
  27. return charge_num;
  28. }
  29. public void setCharge_num(float charge_num) {
  30. this.charge_num = charge_num;
  31. }
  32. public String getCtime() {
  33. return ctime;
  34. }
  35. public void setCtime(String ctime) {
  36. this.ctime = ctime;
  37. }
  38. public String getMtime() {
  39. return mtime;
  40. }
  41. public void setMtime(String mtime) {
  42. this.mtime = mtime;
  43. }
  44. @Override
  45. public String toString() {
  46. return "bas_tech_charge_station{"+
  47. "StationID=" + StationID +
  48. ",charge_num=" + charge_num +
  49. ",ctime=" + ctime +
  50. ",mtime=" + ctime +
  51. "}";
  52. }
  53. }

自定义sink

由于flink没有定义好的sinkToMysql的类,需要我们自己实现。这个类就是Sink Function,继承了RichSinkFunction,然后重写了里面的方法。在invoke方法中将数据插入到mysql中。

  1. package com.dtwave.binlog;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import java.sql.Connection;
  5. import java.sql.DriverManager;
  6. import java.sql.PreparedStatement;
  7. /**
  8. * @author : 恋晨
  9. * Date : 2019/5/9 11:35 AM
  10. * 功能 :
  11. */
  12. public class SinkToMysql extends RichSinkFunction<bas_tech_charge_station>{
  13. PreparedStatement ps;
  14. private Connection connection;
  15. /**
  16. * open()方法中建立连接,这样不用
  17. * 每次invoke的时候都要建立连接和释放连接
  18. *
  19. * @param parameters
  20. * @throws Exception
  21. */
  22. @Override
  23. public void open(Configuration parameters) throws Exception{
  24. super.open(parameters);
  25. connection = getConnection();
  26. String sql = "insert into bas_tech_charge_station(StationID , charge_num , ctime , mtime) value( ? , ? , ? ,?);";
  27. ps = this.connection.prepareStatement(sql);
  28. }
  29. /**
  30. *
  31. */
  32. @Override
  33. public void close() throws Exception{
  34. super.close();
  35. /** 关闭连接和释放资源*/
  36. if(connection != null){
  37. connection.close();
  38. }
  39. if(ps !=null){
  40. ps.close();
  41. }
  42. }
  43. /**
  44. * 每条数据的插入都要调用一次 invoke()方法
  45. *
  46. * @param value
  47. * @param context
  48. * @throws Exception
  49. */
  50. @Override
  51. public void invoke(bas_tech_charge_station value, Context context) throws Exception {
  52. /**组装数据,执行插入操作*/
  53. ps.setInt(1 , value.getStationID());
  54. ps.setFloat(2 , value.getCharge_num());
  55. ps.setString(3 , value.getCtime() );
  56. ps.setString(4 , value.getMtime());
  57. ps.executeUpdate();
  58. }
  59. private static Connection getConnection(){
  60. Connection con = null;
  61. try{
  62. Class.forName("com.mysql.jdbc.Driver");
  63. con = DriverManager.getConnection(
  64. "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"
  65. , "root"
  66. , "123456");
  67. }catch (Exception e){
  68. System.out.println("-----------mysql get connect has exception, msg=" + e.getMessage());
  69. }
  70. return con;
  71. }
  72. }

Flink主程序

  1. package com.dtwave.binlog;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.flink.api.common.functions.FilterFunction;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  12. import java.text.SimpleDateFormat;
  13. import java.util.Date;
  14. import java.util.Properties;
  15. /**
  16. * @author : 恋晨
  17. * Date : 2019/5/8 8:14 PM
  18. * 功能 :
  19. */
  20. public class AnalyzeBinlog {
  21. public static void main(String[] args) throws Exception{
  22. Properties props = new Properties();
  23. props.put("bootstrap.servers" , "47.98.227.169:9092");
  24. props.put("zookeeper.connect" , "47.98.227.169:2181");
  25. props.put("group.id" , "test");
  26. /**key 反序列化*/
  27. props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
  28. /**value 反序列化*/
  29. props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
  30. props.put("auto.offset.reset" , "latest");
  31. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  32. /**
  33. *创建kafkaSource
  34. *从tpoic maxwell中创建数据流
  35. */
  36. DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer011<String>(
  37. "maxwell",
  38. new SimpleStringSchema(),
  39. props
  40. ));
  41. /**
  42. *自定义filter方法,过滤掉其他不相关的表的更新和插入操作。这里是用于maxwell是监控整个mysql的更新和插入操作。
  43. *自定义map方法,取出每一条数据中的ConnectorID作为key,原数据作为value,拼成一个Tuple2
  44. */
  45. DataStream<Tuple2<Long , Float>> filterStream = kafkaSource.filter(new FilterFunction<String>() {
  46. @Override
  47. public boolean filter(String s) throws Exception {
  48. if(s.matches("^\\{\"database\":\"guiyi_tset\",\"table\":\"ChargeOrder\".*")){
  49. return true;
  50. }
  51. return false;
  52. }
  53. }).map(new MapFunction<String, Tuple2<Long , Float>>() {
  54. @Override
  55. public Tuple2<Long , Float> map(String s) throws Exception {
  56. JSONObject jsonObject = JSON.parseObject(s);
  57. JSONObject data = JSONObject.parseObject(jsonObject.get("data").toString());
  58. return new Tuple2<>(Long.valueOf(data.get("ConnectorID").toString()) , Float.valueOf(data.get("TotalPower").toString()));
  59. }
  60. }).keyBy(0).sum(1);
  61. /**
  62. *
  63. *自定义map方法,将上面结果中的Tuple2转化成需要的bas_tech_charge_station类,便于sink。
  64. */
  65. DataStream<bas_tech_charge_station> outStream = filterStream.map(new MapFunction<Tuple2<Long, Float>, bas_tech_charge_station>() {
  66. @Override
  67. public bas_tech_charge_station map(Tuple2<Long, Float> value) throws Exception {
  68. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  69. return new bas_tech_charge_station(Integer.parseInt(value.f0.toString()) , value.f1 , df.format(new Date()).toString() , df.format(new Date()).toString());
  70. }
  71. });
  72. outStream.addSink(new SinkToMysql());
  73. env.execute("sink maxwell to mysql");
  74. }
  75. }

结果

启动maxwell、zookeeper、kafka。
然后运行flink主程序。向表ChargeOrder插入数据。
查看结果表:
image.png
结果表中成功累计了订单表中的充电量。