上一篇文章我们介绍了mysql binlog的基本用法,那么这票文章就带领大家优雅的使用mysql binlog在项目中使用
一、数据库配置
首先打开数据库的 binlog 支持,并且设置为 row 模式
查看是否支持binlog
mysql> show variables like '%bin%';
+--------------------------------------------+--------------------------------------+
| Variable_name | Value |
+--------------------------------------------+--------------------------------------+
| bind_address | * |
| binlog_cache_size | 32768 |
| binlog_checksum | CRC32 |
| binlog_direct_non_transactional_updates | OFF |
| binlog_error_action | ABORT_SERVER |
| binlog_format | ROW |
| binlog_group_commit_sync_delay | 0 |
| binlog_group_commit_sync_no_delay_count | 0 |
| binlog_gtid_simple_recovery | ON |
| binlog_max_flush_queue_time | 0 |
| binlog_order_commits | ON |
| binlog_row_image | FULL |
| binlog_rows_query_log_events | OFF |
| binlog_stmt_cache_size | 32768 |
| binlog_transaction_dependency_history_size | 25000 |
| binlog_transaction_dependency_tracking | COMMIT_ORDER |
| innodb_api_enable_binlog | OFF |
| innodb_locks_unsafe_for_binlog | OFF |
| log_bin | ON |
| log_bin_basename | /data/mysql/mysql-5.7/data/bin |
| log_bin_index | /data/mysql/mysql-5.7/data/bin.index |
| log_bin_trust_function_creators | ON |
| log_bin_use_v1_row_events | OFF |
| log_statements_unsafe_for_binlog | ON |
| max_binlog_cache_size | 18446744073709547520 |
| max_binlog_size | 1073741824 |
| max_binlog_stmt_cache_size | 18446744073709547520 |
| sql_log_bin | ON |
| sync_binlog | 1 |
+--------------------------------------------+--------------------------------------+
29 rows in set (0.17 sec)
如果不支持,可通过在my.cnf中配置log-bin打开binlog支持,然后重启数据库再次查看是否支持
log-bin=mysql-bin
相关变量
-- Binlog 开关变量
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.30 sec)
-- Binlog 日志的格式
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
二、创建项目
我们创建的是一个 springboot 的项目,不了解 springboot 的可以网上先了解下
然后我们引入 binlog 的 java 包
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
然后创建 监听处理器,这个处理器,主要处理项目中的应用注册到这个处理器上,并且监听注册到处理器中的数据库操作:
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author clown
* @version 1.0.0
* @ClassName BinlogHandler
* @Description binlog 日志处理
*/
@Component
@Slf4j
public class BinlogHandler implements BinaryLogClient.EventListener {
private String dataBaseName;
private String tableName;
private final Map<String, BinLogListener> listenerMap = new HashMap<>();
@Resource
private BingLogTableHolder bingLogTableHolder;
private String getKey(String dataBaseName, String tableName) {
return dataBaseName + ":" + tableName;
}
public void register(String dataBaseName, String tableName, BinLogListener binLogListener) {
log.info("binlog handler register: {}-{}", dataBaseName, tableName);
this.listenerMap.put(getKey(dataBaseName, tableName), binLogListener);
bingLogTableHolder.initTable(dataBaseName, tableName);
}
@Override
public void onEvent(Event event) {
final EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData data = event.getData();
this.tableName = data.getTable();
this.dataBaseName = data.getDatabase();
return;
}
if (eventType != EventType.UPDATE_ROWS
&& eventType != EventType.WRITE_ROWS
&& eventType != EventType.DELETE_ROWS) {
return;
}
if (StringUtils.isEmpty(dataBaseName) || StringUtils.isEmpty(tableName)) {
log.error("binlog handler error, dataBaseName or tableName is empty");
return;
}
// 找出对应表有兴趣的监听器
String key = getKey(this.dataBaseName, this.tableName);
BinLogListener listener = this.listenerMap.get(key);
if (null == listener) {
log.debug("skip {}", key);
return;
}
log.info("trigger event: {}", eventType.name());
try {
BingLogRowData rowData = this.build(event, listener);
rowData.setType(eventType);
rowData.setTableName(tableName);
rowData.setDataBaseName(dataBaseName);
listener.onEvent(rowData);
} catch (Exception e) {
log.error("binlog handler error ,error message:{}", e.getMessage(), e);
} finally {
this.dataBaseName = null;
this.tableName = null;
}
}
private List<Serializable[]> getAfterValues(EventData eventData) {
if (eventData instanceof WriteRowsEventData) {
return ((WriteRowsEventData) eventData).getRows();
}
if (eventData instanceof UpdateRowsEventData) {
return ((UpdateRowsEventData) eventData).getRows()
.stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList());
}
if (eventData instanceof DeleteRowsEventData) {
return ((DeleteRowsEventData) eventData).getRows();
}
return Collections.emptyList();
}
private List<Serializable[]> getBeforeValues(EventData eventData) {
if (eventData instanceof WriteRowsEventData) {
return null;
}
if (eventData instanceof UpdateRowsEventData) {
return ((UpdateRowsEventData) eventData).getRows()
.stream()
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
if (eventData instanceof DeleteRowsEventData) {
return null;
}
return Collections.emptyList();
}
private BingLogRowData build(Event event, BinLogListener listener) {
final EventData data = event.getData();
BingLogRowData rowData = new BingLogRowData();
if (data instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
listener.onWrite(writeRowsEventData);
} else if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
listener.onUpdate(updateRowsEventData);
} else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
listener.onDelete(deleteRowsEventData);
}
final Map<Integer, String> table = bingLogTableHolder.getTable(this.dataBaseName, this.tableName);
List<Map<String, Object>> afterMapList = new ArrayList<>();
List<Map<String, Object>> beforeMapList = new ArrayList<>();
final List<Serializable[]> beforeValues = getBeforeValues(data);
final List<Serializable[]> afterValues = getAfterValues(data);
if (afterValues != null) {
for (int i = 0; i < afterValues.size(); i++) {
Map<String, Object> afterMap = new HashMap<>();
Map<String, Object> beforeMap = new HashMap<>();
final Serializable[] after = afterValues.get(i);
Serializable[] before = null;
if (beforeValues != null) {
before = beforeValues.get(i);
}
for (int ix = 0; ix < after.length; ix++) {
String colName = table.get(ix + 1);
if (colName == null) {
continue;
}
Object afterValue = after[ix];
afterMap.put(colName, afterValue);
if ("id".equals(colName)){
rowData.setDataId(afterValue);
}
if (before != null) {
Object beforeValue = before[ix];
beforeMap.put(colName, beforeValue);
beforeMapList.add(beforeMap);
rowData.setBefore(beforeMapList);
}
afterMapList.add(afterMap);
rowData.setAfter(afterMapList);
}
}
}
return rowData;
}
}
上面是 binlog 的处理器,其中 register 方法是别的监听器注册的方法,当别的监听器注册到这个处理器上后,这个处理器就可以处理那些对应数据库数据更新监听,并且初始化其监听的数据库对应的表的字段数据
onEvent 方法 则是实现 BinaryLogClient.EventListener 的方法,当 BinaryLogClient 中监听的事件方法,此方法实现了获取监听到的数据,并将数据处理后分发给对应的监听器,让对应监听器处理其对应的数据变动
下面是对应的 binlog 数据库表字段监听的初始化类
import lombok.Getter;
import lombok.Setter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author clown
* @version 1.0.0
* @ClassName BingLogTableHolder
* @Description binlog 处理的表字段
*/
@Component
public class BingLogTableHolder {
@Resource
private JdbcTemplate jdbcTemplate;
private static final String INIT_SQL = "select table_schema, table_name, column_name, " +
"ordinal_position from information_schema.columns where table_schema = ? and table_name = ?";
private static final Map<String, Map<Integer, String>> TABLE_COLUMN_MAP = new HashMap<>();
public void initTable(String databaseName, String tableName) {
final List<TableColumn> query = jdbcTemplate.query(INIT_SQL, new Object[]{databaseName, tableName}, (rs, index) -> new TableColumn().mapRow(rs, index));
final Map<Integer, String> collect = query.stream()
.collect(Collectors.toMap(TableColumn::getPosition, TableColumn::getColumnName));
TABLE_COLUMN_MAP.put(databaseName + ":" + tableName, collect);
}
public Map<Integer, String> getTable(String databaseName, String tableName) {
return TABLE_COLUMN_MAP.get(databaseName + ":" + tableName);
}
@Getter
@Setter
public static class TableColumn implements RowMapper<TableColumn> {
private String columnName;
private Integer position;
//private String tableName;
//private String databaseName;
@Override
public TableColumn mapRow(ResultSet rs, int i) throws SQLException {
// this.databaseName = rs.getString("table_schema");
// this.tableName = rs.getString("table_name");
this.columnName = rs.getString("column_name");
this.position = rs.getInt("ordinal_position");
return this;
}
}
}
上面的类比较简单,核心就两个方法,initTable 就是根据传入的数据库名称和表名称,将数据库中表的字段名和其所处的数据库中存储postion获取出来,因为binog 只能获取到数据在表中所处的第n个字段,所以需要这样处理
getTable就是根据传入的数据库名称和表名称获取当前表中字段的下标
接着就是监听器了
首先是监听器的接口
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
/**
* @author clown
* @version 1.0.0
* @ClassName BinLogListener
* @Description bin
*/
public interface BinLogListener {
/**
* 将监听器注册到处理器中
*/
void register(BinlogHandler binlogHandler);
/**
* 当触发这些事件的时候
*
* @param rowData
*/
void onEvent(BingLogRowData rowData) throws Exception;
default void onUpdate(UpdateRowsEventData eventData) {
System.out.println("UpdateRowsEventData:" +eventData);
}
default void onDelete(DeleteRowsEventData eventData) {
System.out.println("DeleteRowsEventData:" +eventData);
}
default void onWrite(WriteRowsEventData eventData) {
System.out.println("WriteRowsEventData:" +eventData);
}
上面的 register 方法就是将此监听器注册到handler中的工具
onEvent方法 则是根据 handler 中的事件和监听类型分发处理的方法
其他的几个 默认方法是触发对应的事件后的方法,此处不再多加说明
然后是监听器的实现类
import com.share51.cms.binlog.BinLogListener;
import com.share51.cms.binlog.BingLogRowData;
import com.share51.cms.binlog.BinlogHandler;
import com.share51.cms.entity.Link;
import org.springframework.stereotype.Component;
/**
* @author clown
* @version 1.0.0
* @ClassName LinkListener
* @Description 友情管理监听器
*/
@Component("linkListener")
public class LinkListener implements BinLogListener {
private static final String DATA_BASE_NAME = "cms_study";
private static final String TABLE_NAME = "t_link";
@Override
public void register(BinlogHandler binlogHandler) {
binlogHandler.register(DATA_BASE_NAME, TABLE_NAME, this);
}
@Override
public void onEvent(BingLogRowData eventData) throws Exception {
System.out.println(this.getClass().getName() + ":" + eventData);
final Link beforeData = eventData.getBeforeData(Link.class);
System.out.println(beforeData.getId());
}
}
此处不再多加描述,就是将 DATA_BASE_NAME 库和 TABLE_NAME 表注册到监听器中,让后处理其方法
接着是 handler 处理的数据变化后对象
import com.github.shyiko.mysql.binlog.event.EventType;
import com.share51.cms.entity.BaseEntity;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author clown
* @version 1.0.0
* @ClassName BingLogRowData
* @Description binlog数据
*/
@Getter
@Setter
@ToString
public class BingLogRowData {
/**
* 数据库的名称
*/
private String dataBaseName;
/**
* 监听到变化表的名称
*/
private String tableName;
/**
* 修改数据库的主键 id
*/
private Object dataId;
/**
* 监听的 binglog 类型
*/
private EventType type;
/**
* 修改前的数据
*/
private List<Map<String, Object>> after;
/**
* 修改后的数据
*/
private List<Map<String, Object>> before;
public <T extends BaseEntity> T getAfterData(Class<T> clazz) throws Exception {
return getData(after, clazz);
}
public <T extends BaseEntity> T getBeforeData(Class<T> clazz) throws Exception {
return getData(before, clazz);
}
private <T extends BaseEntity> T getData(List<Map<String, Object>> data, Class<T> clazz) throws IllegalAccessException, InstantiationException {
final T t = clazz.newInstance();
if (data != null) {
data.forEach(map -> {
for (String key : map.keySet()) {
final String fieldName = getFieldName(key);
Field declaredField = null;
try {
declaredField = clazz.getDeclaredField(fieldName);
} catch (NoSuchFieldException e) {
try {
declaredField = clazz.getSuperclass().getDeclaredField(fieldName);
} catch (NoSuchFieldException e1) {
e1.printStackTrace();
}
}
try {
if (declaredField != null) {
declaredField.setAccessible(true);
declaredField.set(t, map.get(key));
declaredField.setAccessible(false);
}
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
}
return t;
}
private String getFieldName(String columnName) {
final String[] split = columnName.split("_");
final String collect = Stream.of(split)
.map(text ->
text.substring(0, 1).toUpperCase() + text.substring(1, text.length())
).collect(Collectors.joining());
return collect.substring(0, 1).toLowerCase() + collect.substring(1, collect.length());
}
}
上面的代码比较简单,此处就不多加描述,只是数据的处理交换层,相当于vo对象
然后是项目binlog的启动和关闭类
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
* @author clown
* @version 1.0.0
* @ClassName BinlogClient
* @Description binlog 客户端
*/
@Component
public class BinlogClient {
@Resource
private BinaryLogClient binaryLogClient;
@Autowired
private Map<String, BinLogListener> binLogListenerMap;
@Resource
private BinlogHandler binlogHandler;
@PostConstruct
public void start() {
// 注册监听器
final Set<Map.Entry<String, BinLogListener>> entries = binLogListenerMap.entrySet();
for (Map.Entry<String, BinLogListener> entity : entries) {
final BinLogListener listener = entity.getValue();
listener.register(binlogHandler);
}
new Thread(() -> {
binaryLogClient.registerEventListener(binlogHandler);
try {
binaryLogClient.connect();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
@PreDestroy
public void destroy() throws IOException {
if (binaryLogClient != null) {
binaryLogClient.disconnect();
}
}
此处也是比较简单的方法,只不过,此处用到了 spring 一个比较容易被忽视的功能,依赖查询,binLogListenerMap 中实际存储的是 实现 binLogListener 接口的所有bean,此处这么使用的原因是为了快速找到项目中的所有监听器,并且将所有的监听器注册到handler中。
然后在线程中启动这个对象的原因是,binlog 会造成线程的堵塞,若不在子线程中启动,就会一直堵塞项目启动。
最后就是客户端的配置了
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.*;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author clown
* @version 1.0.0
* @ClassName BinlogConfig
* @Description mysql binlog 配置类
*/
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "share51.binlog")
public class BinlogConfig {
private String host;
private Integer port = 3306;
private String username;
private String password;
@Bean
public BinaryLogClient binaryLogClient() {
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
return client;
}
以上就是 binlog 的主要代码了,想要处理数据缓存的话,可以在每个监听器的 onEvent 中处理对应的缓存更新业务。