上一篇文章我们介绍了mysql binlog的基本用法,那么这票文章就带领大家优雅的使用mysql binlog在项目中使用
一、数据库配置
首先打开数据库的 binlog 支持,并且设置为 row 模式
查看是否支持binlog
mysql> show variables like '%bin%';+--------------------------------------------+--------------------------------------+| Variable_name | Value |+--------------------------------------------+--------------------------------------+| bind_address | * || binlog_cache_size | 32768 || binlog_checksum | CRC32 || binlog_direct_non_transactional_updates | OFF || binlog_error_action | ABORT_SERVER || binlog_format | ROW || binlog_group_commit_sync_delay | 0 || binlog_group_commit_sync_no_delay_count | 0 || binlog_gtid_simple_recovery | ON || binlog_max_flush_queue_time | 0 || binlog_order_commits | ON || binlog_row_image | FULL || binlog_rows_query_log_events | OFF || binlog_stmt_cache_size | 32768 || binlog_transaction_dependency_history_size | 25000 || binlog_transaction_dependency_tracking | COMMIT_ORDER || innodb_api_enable_binlog | OFF || innodb_locks_unsafe_for_binlog | OFF || log_bin | ON || log_bin_basename | /data/mysql/mysql-5.7/data/bin || log_bin_index | /data/mysql/mysql-5.7/data/bin.index || log_bin_trust_function_creators | ON || log_bin_use_v1_row_events | OFF || log_statements_unsafe_for_binlog | ON || max_binlog_cache_size | 18446744073709547520 || max_binlog_size | 1073741824 || max_binlog_stmt_cache_size | 18446744073709547520 || sql_log_bin | ON || sync_binlog | 1 |+--------------------------------------------+--------------------------------------+29 rows in set (0.17 sec)
如果不支持,可通过在my.cnf中配置log-bin打开binlog支持,然后重启数据库再次查看是否支持
log-bin=mysql-bin
相关变量
-- Binlog 开关变量mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+1 row in set (0.30 sec)-- Binlog 日志的格式mysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW |+---------------+-------+1 row in set (0.00 sec)
二、创建项目
我们创建的是一个 springboot 的项目,不了解 springboot 的可以网上先了解下
然后我们引入 binlog 的 java 包
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.21.0</version></dependency>
然后创建 监听处理器,这个处理器,主要处理项目中的应用注册到这个处理器上,并且监听注册到处理器中的数据库操作:
import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.*;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import javax.annotation.Resource;import java.io.Serializable;import java.util.*;import java.util.stream.Collectors;/*** @author clown* @version 1.0.0* @ClassName BinlogHandler* @Description binlog 日志处理*/@Component@Slf4jpublic class BinlogHandler implements BinaryLogClient.EventListener {private String dataBaseName;private String tableName;private final Map<String, BinLogListener> listenerMap = new HashMap<>();@Resourceprivate BingLogTableHolder bingLogTableHolder;private String getKey(String dataBaseName, String tableName) {return dataBaseName + ":" + tableName;}public void register(String dataBaseName, String tableName, BinLogListener binLogListener) {log.info("binlog handler register: {}-{}", dataBaseName, tableName);this.listenerMap.put(getKey(dataBaseName, tableName), binLogListener);bingLogTableHolder.initTable(dataBaseName, tableName);}@Overridepublic void onEvent(Event event) {final EventType eventType = event.getHeader().getEventType();if (eventType == EventType.TABLE_MAP) {TableMapEventData data = event.getData();this.tableName = data.getTable();this.dataBaseName = data.getDatabase();return;}if (eventType != EventType.UPDATE_ROWS&& eventType != EventType.WRITE_ROWS&& eventType != EventType.DELETE_ROWS) {return;}if (StringUtils.isEmpty(dataBaseName) || StringUtils.isEmpty(tableName)) {log.error("binlog handler error, dataBaseName or tableName is empty");return;}// 找出对应表有兴趣的监听器String key = getKey(this.dataBaseName, this.tableName);BinLogListener listener = this.listenerMap.get(key);if (null == listener) {log.debug("skip {}", key);return;}log.info("trigger event: {}", eventType.name());try {BingLogRowData rowData = this.build(event, listener);rowData.setType(eventType);rowData.setTableName(tableName);rowData.setDataBaseName(dataBaseName);listener.onEvent(rowData);} catch (Exception e) {log.error("binlog handler error ,error message:{}", e.getMessage(), e);} finally {this.dataBaseName = null;this.tableName = null;}}private List<Serializable[]> getAfterValues(EventData eventData) {if (eventData instanceof WriteRowsEventData) {return ((WriteRowsEventData) eventData).getRows();}if (eventData instanceof UpdateRowsEventData) {return ((UpdateRowsEventData) eventData).getRows().stream().map(Map.Entry::getValue).collect(Collectors.toList());}if (eventData instanceof DeleteRowsEventData) {return ((DeleteRowsEventData) eventData).getRows();}return Collections.emptyList();}private List<Serializable[]> getBeforeValues(EventData eventData) {if (eventData instanceof WriteRowsEventData) {return null;}if (eventData instanceof UpdateRowsEventData) {return ((UpdateRowsEventData) eventData).getRows().stream().map(Map.Entry::getKey).collect(Collectors.toList());}if (eventData instanceof DeleteRowsEventData) {return null;}return Collections.emptyList();}private BingLogRowData build(Event event, BinLogListener listener) {final EventData data = event.getData();BingLogRowData rowData = new BingLogRowData();if (data instanceof WriteRowsEventData) {WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;listener.onWrite(writeRowsEventData);} else if (data instanceof UpdateRowsEventData) {UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;listener.onUpdate(updateRowsEventData);} else if (data instanceof DeleteRowsEventData) {DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;listener.onDelete(deleteRowsEventData);}final Map<Integer, String> table = bingLogTableHolder.getTable(this.dataBaseName, this.tableName);List<Map<String, Object>> afterMapList = new ArrayList<>();List<Map<String, Object>> beforeMapList = new ArrayList<>();final List<Serializable[]> beforeValues = getBeforeValues(data);final List<Serializable[]> afterValues = getAfterValues(data);if (afterValues != null) {for (int i = 0; i < afterValues.size(); i++) {Map<String, Object> afterMap = new HashMap<>();Map<String, Object> beforeMap = new HashMap<>();final Serializable[] after = afterValues.get(i);Serializable[] before = null;if (beforeValues != null) {before = beforeValues.get(i);}for (int ix = 0; ix < after.length; ix++) {String colName = table.get(ix + 1);if (colName == null) {continue;}Object afterValue = after[ix];afterMap.put(colName, afterValue);if ("id".equals(colName)){rowData.setDataId(afterValue);}if (before != null) {Object beforeValue = before[ix];beforeMap.put(colName, beforeValue);beforeMapList.add(beforeMap);rowData.setBefore(beforeMapList);}afterMapList.add(afterMap);rowData.setAfter(afterMapList);}}}return rowData;}}
上面是 binlog 的处理器,其中 register 方法是别的监听器注册的方法,当别的监听器注册到这个处理器上后,这个处理器就可以处理那些对应数据库数据更新监听,并且初始化其监听的数据库对应的表的字段数据
onEvent 方法 则是实现 BinaryLogClient.EventListener 的方法,当 BinaryLogClient 中监听的事件方法,此方法实现了获取监听到的数据,并将数据处理后分发给对应的监听器,让对应监听器处理其对应的数据变动
下面是对应的 binlog 数据库表字段监听的初始化类
import lombok.Getter;import lombok.Setter;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.jdbc.core.RowMapper;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.sql.ResultSet;import java.sql.SQLException;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;/*** @author clown* @version 1.0.0* @ClassName BingLogTableHolder* @Description binlog 处理的表字段*/@Componentpublic class BingLogTableHolder {@Resourceprivate JdbcTemplate jdbcTemplate;private static final String INIT_SQL = "select table_schema, table_name, column_name, " +"ordinal_position from information_schema.columns where table_schema = ? and table_name = ?";private static final Map<String, Map<Integer, String>> TABLE_COLUMN_MAP = new HashMap<>();public void initTable(String databaseName, String tableName) {final List<TableColumn> query = jdbcTemplate.query(INIT_SQL, new Object[]{databaseName, tableName}, (rs, index) -> new TableColumn().mapRow(rs, index));final Map<Integer, String> collect = query.stream().collect(Collectors.toMap(TableColumn::getPosition, TableColumn::getColumnName));TABLE_COLUMN_MAP.put(databaseName + ":" + tableName, collect);}public Map<Integer, String> getTable(String databaseName, String tableName) {return TABLE_COLUMN_MAP.get(databaseName + ":" + tableName);}@Getter@Setterpublic static class TableColumn implements RowMapper<TableColumn> {private String columnName;private Integer position;//private String tableName;//private String databaseName;@Overridepublic TableColumn mapRow(ResultSet rs, int i) throws SQLException {// this.databaseName = rs.getString("table_schema");// this.tableName = rs.getString("table_name");this.columnName = rs.getString("column_name");this.position = rs.getInt("ordinal_position");return this;}}}
上面的类比较简单,核心就两个方法,initTable 就是根据传入的数据库名称和表名称,将数据库中表的字段名和其所处的数据库中存储postion获取出来,因为binog 只能获取到数据在表中所处的第n个字段,所以需要这样处理
getTable就是根据传入的数据库名称和表名称获取当前表中字段的下标
接着就是监听器了
首先是监听器的接口
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;/*** @author clown* @version 1.0.0* @ClassName BinLogListener* @Description bin*/public interface BinLogListener {/*** 将监听器注册到处理器中*/void register(BinlogHandler binlogHandler);/*** 当触发这些事件的时候** @param rowData*/void onEvent(BingLogRowData rowData) throws Exception;default void onUpdate(UpdateRowsEventData eventData) {System.out.println("UpdateRowsEventData:" +eventData);}default void onDelete(DeleteRowsEventData eventData) {System.out.println("DeleteRowsEventData:" +eventData);}default void onWrite(WriteRowsEventData eventData) {System.out.println("WriteRowsEventData:" +eventData);}
上面的 register 方法就是将此监听器注册到handler中的工具
onEvent方法 则是根据 handler 中的事件和监听类型分发处理的方法
其他的几个 默认方法是触发对应的事件后的方法,此处不再多加说明
然后是监听器的实现类
import com.share51.cms.binlog.BinLogListener;import com.share51.cms.binlog.BingLogRowData;import com.share51.cms.binlog.BinlogHandler;import com.share51.cms.entity.Link;import org.springframework.stereotype.Component;/*** @author clown* @version 1.0.0* @ClassName LinkListener* @Description 友情管理监听器*/@Component("linkListener")public class LinkListener implements BinLogListener {private static final String DATA_BASE_NAME = "cms_study";private static final String TABLE_NAME = "t_link";@Overridepublic void register(BinlogHandler binlogHandler) {binlogHandler.register(DATA_BASE_NAME, TABLE_NAME, this);}@Overridepublic void onEvent(BingLogRowData eventData) throws Exception {System.out.println(this.getClass().getName() + ":" + eventData);final Link beforeData = eventData.getBeforeData(Link.class);System.out.println(beforeData.getId());}}
此处不再多加描述,就是将 DATA_BASE_NAME 库和 TABLE_NAME 表注册到监听器中,让后处理其方法
接着是 handler 处理的数据变化后对象
import com.github.shyiko.mysql.binlog.event.EventType;import com.share51.cms.entity.BaseEntity;import lombok.Getter;import lombok.Setter;import lombok.ToString;import java.lang.reflect.Field;import java.util.*;import java.util.stream.Collectors;import java.util.stream.Stream;/*** @author clown* @version 1.0.0* @ClassName BingLogRowData* @Description binlog数据*/@Getter@Setter@ToStringpublic class BingLogRowData {/*** 数据库的名称*/private String dataBaseName;/*** 监听到变化表的名称*/private String tableName;/*** 修改数据库的主键 id*/private Object dataId;/*** 监听的 binglog 类型*/private EventType type;/*** 修改前的数据*/private List<Map<String, Object>> after;/*** 修改后的数据*/private List<Map<String, Object>> before;public <T extends BaseEntity> T getAfterData(Class<T> clazz) throws Exception {return getData(after, clazz);}public <T extends BaseEntity> T getBeforeData(Class<T> clazz) throws Exception {return getData(before, clazz);}private <T extends BaseEntity> T getData(List<Map<String, Object>> data, Class<T> clazz) throws IllegalAccessException, InstantiationException {final T t = clazz.newInstance();if (data != null) {data.forEach(map -> {for (String key : map.keySet()) {final String fieldName = getFieldName(key);Field declaredField = null;try {declaredField = clazz.getDeclaredField(fieldName);} catch (NoSuchFieldException e) {try {declaredField = clazz.getSuperclass().getDeclaredField(fieldName);} catch (NoSuchFieldException e1) {e1.printStackTrace();}}try {if (declaredField != null) {declaredField.setAccessible(true);declaredField.set(t, map.get(key));declaredField.setAccessible(false);}} catch (IllegalAccessException e) {e.printStackTrace();}}});}return t;}private String getFieldName(String columnName) {final String[] split = columnName.split("_");final String collect = Stream.of(split).map(text ->text.substring(0, 1).toUpperCase() + text.substring(1, text.length())).collect(Collectors.joining());return collect.substring(0, 1).toLowerCase() + collect.substring(1, collect.length());}}
上面的代码比较简单,此处就不多加描述,只是数据的处理交换层,相当于vo对象
然后是项目binlog的启动和关闭类
import com.github.shyiko.mysql.binlog.BinaryLogClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import javax.annotation.Resource;import java.io.IOException;import java.util.Map;import java.util.Set;/*** @author clown* @version 1.0.0* @ClassName BinlogClient* @Description binlog 客户端*/@Componentpublic class BinlogClient {@Resourceprivate BinaryLogClient binaryLogClient;@Autowiredprivate Map<String, BinLogListener> binLogListenerMap;@Resourceprivate BinlogHandler binlogHandler;@PostConstructpublic void start() {// 注册监听器final Set<Map.Entry<String, BinLogListener>> entries = binLogListenerMap.entrySet();for (Map.Entry<String, BinLogListener> entity : entries) {final BinLogListener listener = entity.getValue();listener.register(binlogHandler);}new Thread(() -> {binaryLogClient.registerEventListener(binlogHandler);try {binaryLogClient.connect();} catch (IOException e) {e.printStackTrace();}}).start();}@PreDestroypublic void destroy() throws IOException {if (binaryLogClient != null) {binaryLogClient.disconnect();}}
此处也是比较简单的方法,只不过,此处用到了 spring 一个比较容易被忽视的功能,依赖查询,binLogListenerMap 中实际存储的是 实现 binLogListener 接口的所有bean,此处这么使用的原因是为了快速找到项目中的所有监听器,并且将所有的监听器注册到handler中。
然后在线程中启动这个对象的原因是,binlog 会造成线程的堵塞,若不在子线程中启动,就会一直堵塞项目启动。
最后就是客户端的配置了
import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.deserialization.*;import lombok.Getter;import lombok.Setter;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author clown* @version 1.0.0* @ClassName BinlogConfig* @Description mysql binlog 配置类*/@Getter@Setter@Configuration@ConfigurationProperties(prefix = "share51.binlog")public class BinlogConfig {private String host;private Integer port = 3306;private String username;private String password;@Beanpublic BinaryLogClient binaryLogClient() {BinaryLogClient client = new BinaryLogClient(host, port, username, password);EventDeserializer eventDeserializer = new EventDeserializer();eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);return client;}
以上就是 binlog 的主要代码了,想要处理数据缓存的话,可以在每个监听器的 onEvent 中处理对应的缓存更新业务。
