背景
客户的业务系统老旧,无法为实时计算提供实时消息。如何保证mysql的插入或者更新的数据能实时发送到flink提供计算?
解决方案
在调研(百度)一番后,发现maxwell和canal可以实现监控mysql的binlog并发送到kafka。
- 我们这里选择maxwell来监控binlog。
- maxwell:https://github.com/zendesk/maxwell
- maxwell是一个读取MySQL binlog并将行更新作为JSON写入Kafka,Kinesis或者其它流数据平台的应用程序。Maxwell的操作开销很低,只需要mysql和写入的地方。其常见应用包括ETL,缓存构建/到期,度量收集,搜索索引和服务间通信。Maxwell为您提供了事件订阅的一些好处,而无需重新构建整个平台。
- 如上图,整体是:利用maxwell监控mysql的binlog将更新或者插入的数据以json格式发送的kafka的某个topic,flink程序消费该topic并计算实时指标,最后flink将计算结果sink到mysql。
实操过程
配置maxwell监控binlog
安装maxwell
- 下载二进制发行版: https://github.com/zendesk/maxwell/releases/download/v1.22.0/maxwell-1.22.0.tar.gz
- Github 上有源代码和 bug 跟踪: https://github.com/zendesk/maxwell
- 复制/粘贴到终端执行:
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.22.0/maxwell-1.22.0.tar.gz \
| tar zxvf -
cd maxwell-1.22.0
#or get the docker image:
docker pull zendesk/maxwell
#or on Mac OS X with homebrew installed:
brew install maxwell
配置mysql
服务器配置: 确保配置了 server_id,并且打开了基于行的复制。
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
或者在正在运行的服务上:
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
注意: binlog_format 是基于会话的属性。要完全转换,您需要关闭所有活动连接到基于行的复制。
权限: Maxwell需要数据库指定的schema_database指定的选项(默认 maxwell)的存储状态的权限。
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell'@'%';
mysql> flush privileges;
# 或者本地运行maxwell:
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell'@'localhost';
运行maxwell
输出结果到命令行
$ cd MAXWELL_HOME
$ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout
输出结果到kafka
启动kafka,并创建相应topic。
$ cd MAXWELL_HOME
$ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell
测试
- 首先在业务mysql中新建如下测试表:
``sql DROP TABLE IF EXISTS
maxwell_test; CREATE TABLE
maxwell_test(
idint(11) DEFAULT NULL,
namevarchar(255) CHARACTER SET latin1 DEFAULT NULL,
address` varchar(255) CHARACTER SET latin1 DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
SET FOREIGN_KEY_CHECKS = 1;
2.启动zookeeper和kafka:
```shell
# 启动ZK
$ cd $ZOOKEEPER_HOME
$ bin/zkServer.sh start
# 查看ZK是否正常启动
$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone
# ZK已经启动了单节点
# 启动kafka
$ cd KAFKA_HOME
$ bin/kafka-server-start.sh config/server.properties
# 查看kafka进程
$ jps
22141 Kafka
# 创建测试topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic maxwell
# 查看topic-list
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
maxwell
启动maxwell
# 启动maxwell往kafka发送数据
$ cd MAXWELL_HOME
$ bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell
改动数据查看结果 ```shell
启动一个kafka的消费者,用于观察maxwell的输出
$ cd KAFKA_HOME $ bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic maxwell
向mysql测试表新增一条数据 id=1,name=’test’,address=’test’
kafka消费者接收到的数据如下:
{“database”:”guiyi_tset”,”table”:”maxwell_test”,”type”:”insert”,”ts”:1557453039,”xid”:81408,”commit”:true,”data”:{“id”:1,”name”:”test”,”address”:”test”}}
修改测试表数据
kafka消费者接收到的数据如下:
{“database”:”guiyi_tset”,”table”:”maxwell_test”,”type”:”update”,”ts”:1557453200,”xid”:81624,”commit”:true,”data”:{“id”:1,”name”:”test”,”address”:”hangzhou”},”old”:{“address”:”test”}} ```
至此maxwell监控mysql的binlog并发送到kafka的这部分都走通了。
编写flink程序
应用数据库建表
新建应用数据库表,用于flink sink计算结果。
DROP TABLE IF EXISTS `bas_tech_charge_station`;
CREATE TABLE `bas_tech_charge_station` (
`StationID` int(255) NOT NULL COMMENT '充电站ID',
`charge_num` float DEFAULT NULL COMMENT '充电量',
`ctime` datetime DEFAULT NULL COMMENT '创建时间',
`mtime` datetime DEFAULT NULL COMMENT '修改时间'
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
SET FOREIGN_KEY_CHECKS = 1;
实体类
package com.dtwave.binlog;
/**
* @author : 恋晨
* Date : 2019/5/9 11:37 AM
* 功能 :
*/
public class bas_tech_charge_station {
private int StationID;
private float charge_num;
private String ctime;
private String mtime;
public bas_tech_charge_station() {
}
public bas_tech_charge_station(int stationID, float charge_num, String ctime, String mtime) {
StationID = stationID;
this.charge_num = charge_num;
this.ctime = ctime;
this.mtime = mtime;
}
public int getStationID() {
return StationID;
}
public void setStationID(int stationID) {
StationID = stationID;
}
public float getCharge_num() {
return charge_num;
}
public void setCharge_num(float charge_num) {
this.charge_num = charge_num;
}
public String getCtime() {
return ctime;
}
public void setCtime(String ctime) {
this.ctime = ctime;
}
public String getMtime() {
return mtime;
}
public void setMtime(String mtime) {
this.mtime = mtime;
}
@Override
public String toString() {
return "bas_tech_charge_station{"+
"StationID=" + StationID +
",charge_num=" + charge_num +
",ctime=" + ctime +
",mtime=" + ctime +
"}";
}
}
自定义sink
由于flink没有定义好的sinkToMysql的类,需要我们自己实现。这个类就是Sink Function,继承了RichSinkFunction,然后重写了里面的方法。在invoke方法中将数据插入到mysql中。
package com.dtwave.binlog;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @author : 恋晨
* Date : 2019/5/9 11:35 AM
* 功能 :
*/
public class SinkToMysql extends RichSinkFunction<bas_tech_charge_station>{
PreparedStatement ps;
private Connection connection;
/**
* open()方法中建立连接,这样不用
* 每次invoke的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception{
super.open(parameters);
connection = getConnection();
String sql = "insert into bas_tech_charge_station(StationID , charge_num , ctime , mtime) value( ? , ? , ? ,?);";
ps = this.connection.prepareStatement(sql);
}
/**
*
*/
@Override
public void close() throws Exception{
super.close();
/** 关闭连接和释放资源*/
if(connection != null){
connection.close();
}
if(ps !=null){
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke()方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(bas_tech_charge_station value, Context context) throws Exception {
/**组装数据,执行插入操作*/
ps.setInt(1 , value.getStationID());
ps.setFloat(2 , value.getCharge_num());
ps.setString(3 , value.getCtime() );
ps.setString(4 , value.getMtime());
ps.executeUpdate();
}
private static Connection getConnection(){
Connection con = null;
try{
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"
, "root"
, "123456");
}catch (Exception e){
System.out.println("-----------mysql get connect has exception, msg=" + e.getMessage());
}
return con;
}
}
Flink主程序
package com.dtwave.binlog;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* @author : 恋晨
* Date : 2019/5/8 8:14 PM
* 功能 :
*/
public class AnalyzeBinlog {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put("bootstrap.servers" , "47.98.227.169:9092");
props.put("zookeeper.connect" , "47.98.227.169:2181");
props.put("group.id" , "test");
/**key 反序列化*/
props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
/**value 反序列化*/
props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset" , "latest");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
*创建kafkaSource
*从tpoic maxwell中创建数据流
*/
DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer011<String>(
"maxwell",
new SimpleStringSchema(),
props
));
/**
*自定义filter方法,过滤掉其他不相关的表的更新和插入操作。这里是用于maxwell是监控整个mysql的更新和插入操作。
*自定义map方法,取出每一条数据中的ConnectorID作为key,原数据作为value,拼成一个Tuple2
*/
DataStream<Tuple2<Long , Float>> filterStream = kafkaSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
if(s.matches("^\\{\"database\":\"guiyi_tset\",\"table\":\"ChargeOrder\".*")){
return true;
}
return false;
}
}).map(new MapFunction<String, Tuple2<Long , Float>>() {
@Override
public Tuple2<Long , Float> map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
JSONObject data = JSONObject.parseObject(jsonObject.get("data").toString());
return new Tuple2<>(Long.valueOf(data.get("ConnectorID").toString()) , Float.valueOf(data.get("TotalPower").toString()));
}
}).keyBy(0).sum(1);
/**
*
*自定义map方法,将上面结果中的Tuple2转化成需要的bas_tech_charge_station类,便于sink。
*/
DataStream<bas_tech_charge_station> outStream = filterStream.map(new MapFunction<Tuple2<Long, Float>, bas_tech_charge_station>() {
@Override
public bas_tech_charge_station map(Tuple2<Long, Float> value) throws Exception {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return new bas_tech_charge_station(Integer.parseInt(value.f0.toString()) , value.f1 , df.format(new Date()).toString() , df.format(new Date()).toString());
}
});
outStream.addSink(new SinkToMysql());
env.execute("sink maxwell to mysql");
}
}
结果
启动maxwell、zookeeper、kafka。
然后运行flink主程序。向表ChargeOrder插入数据。
查看结果表:
结果表中成功累计了订单表中的充电量。