上一篇文章我们介绍了mysql binlog的基本用法,那么这票文章就带领大家优雅的使用mysql binlog在项目中使用

一、数据库配置

首先打开数据库的 binlog 支持,并且设置为 row 模式
查看是否支持binlog

  1. mysql> show variables like '%bin%';
  2. +--------------------------------------------+--------------------------------------+
  3. | Variable_name | Value |
  4. +--------------------------------------------+--------------------------------------+
  5. | bind_address | * |
  6. | binlog_cache_size | 32768 |
  7. | binlog_checksum | CRC32 |
  8. | binlog_direct_non_transactional_updates | OFF |
  9. | binlog_error_action | ABORT_SERVER |
  10. | binlog_format | ROW |
  11. | binlog_group_commit_sync_delay | 0 |
  12. | binlog_group_commit_sync_no_delay_count | 0 |
  13. | binlog_gtid_simple_recovery | ON |
  14. | binlog_max_flush_queue_time | 0 |
  15. | binlog_order_commits | ON |
  16. | binlog_row_image | FULL |
  17. | binlog_rows_query_log_events | OFF |
  18. | binlog_stmt_cache_size | 32768 |
  19. | binlog_transaction_dependency_history_size | 25000 |
  20. | binlog_transaction_dependency_tracking | COMMIT_ORDER |
  21. | innodb_api_enable_binlog | OFF |
  22. | innodb_locks_unsafe_for_binlog | OFF |
  23. | log_bin | ON |
  24. | log_bin_basename | /data/mysql/mysql-5.7/data/bin |
  25. | log_bin_index | /data/mysql/mysql-5.7/data/bin.index |
  26. | log_bin_trust_function_creators | ON |
  27. | log_bin_use_v1_row_events | OFF |
  28. | log_statements_unsafe_for_binlog | ON |
  29. | max_binlog_cache_size | 18446744073709547520 |
  30. | max_binlog_size | 1073741824 |
  31. | max_binlog_stmt_cache_size | 18446744073709547520 |
  32. | sql_log_bin | ON |
  33. | sync_binlog | 1 |
  34. +--------------------------------------------+--------------------------------------+
  35. 29 rows in set (0.17 sec)

如果不支持,可通过在my.cnf中配置log-bin打开binlog支持,然后重启数据库再次查看是否支持

  1. log-bin=mysql-bin

相关变量

  1. -- Binlog 开关变量
  2. mysql> show variables like 'log_bin';
  3. +---------------+-------+
  4. | Variable_name | Value |
  5. +---------------+-------+
  6. | log_bin | ON |
  7. +---------------+-------+
  8. 1 row in set (0.30 sec)
  9. -- Binlog 日志的格式
  10. mysql> show variables like 'binlog_format';
  11. +---------------+-------+
  12. | Variable_name | Value |
  13. +---------------+-------+
  14. | binlog_format | ROW |
  15. +---------------+-------+
  16. 1 row in set (0.00 sec)

二、创建项目

我们创建的是一个 springboot 的项目,不了解 springboot 的可以网上先了解下
然后我们引入 binlog 的 java 包

  1. <dependency>
  2. <groupId>com.github.shyiko</groupId>
  3. <artifactId>mysql-binlog-connector-java</artifactId>
  4. <version>0.21.0</version>
  5. </dependency>

然后创建 监听处理器,这个处理器,主要处理项目中的应用注册到这个处理器上,并且监听注册到处理器中的数据库操作:

  1. import com.github.shyiko.mysql.binlog.BinaryLogClient;
  2. import com.github.shyiko.mysql.binlog.event.*;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.util.StringUtils;
  6. import javax.annotation.Resource;
  7. import java.io.Serializable;
  8. import java.util.*;
  9. import java.util.stream.Collectors;
  10. /**
  11. * @author clown
  12. * @version 1.0.0
  13. * @ClassName BinlogHandler
  14. * @Description binlog 日志处理
  15. */
  16. @Component
  17. @Slf4j
  18. public class BinlogHandler implements BinaryLogClient.EventListener {
  19. private String dataBaseName;
  20. private String tableName;
  21. private final Map<String, BinLogListener> listenerMap = new HashMap<>();
  22. @Resource
  23. private BingLogTableHolder bingLogTableHolder;
  24. private String getKey(String dataBaseName, String tableName) {
  25. return dataBaseName + ":" + tableName;
  26. }
  27. public void register(String dataBaseName, String tableName, BinLogListener binLogListener) {
  28. log.info("binlog handler register: {}-{}", dataBaseName, tableName);
  29. this.listenerMap.put(getKey(dataBaseName, tableName), binLogListener);
  30. bingLogTableHolder.initTable(dataBaseName, tableName);
  31. }
  32. @Override
  33. public void onEvent(Event event) {
  34. final EventType eventType = event.getHeader().getEventType();
  35. if (eventType == EventType.TABLE_MAP) {
  36. TableMapEventData data = event.getData();
  37. this.tableName = data.getTable();
  38. this.dataBaseName = data.getDatabase();
  39. return;
  40. }
  41. if (eventType != EventType.UPDATE_ROWS
  42. && eventType != EventType.WRITE_ROWS
  43. && eventType != EventType.DELETE_ROWS) {
  44. return;
  45. }
  46. if (StringUtils.isEmpty(dataBaseName) || StringUtils.isEmpty(tableName)) {
  47. log.error("binlog handler error, dataBaseName or tableName is empty");
  48. return;
  49. }
  50. // 找出对应表有兴趣的监听器
  51. String key = getKey(this.dataBaseName, this.tableName);
  52. BinLogListener listener = this.listenerMap.get(key);
  53. if (null == listener) {
  54. log.debug("skip {}", key);
  55. return;
  56. }
  57. log.info("trigger event: {}", eventType.name());
  58. try {
  59. BingLogRowData rowData = this.build(event, listener);
  60. rowData.setType(eventType);
  61. rowData.setTableName(tableName);
  62. rowData.setDataBaseName(dataBaseName);
  63. listener.onEvent(rowData);
  64. } catch (Exception e) {
  65. log.error("binlog handler error ,error message:{}", e.getMessage(), e);
  66. } finally {
  67. this.dataBaseName = null;
  68. this.tableName = null;
  69. }
  70. }
  71. private List<Serializable[]> getAfterValues(EventData eventData) {
  72. if (eventData instanceof WriteRowsEventData) {
  73. return ((WriteRowsEventData) eventData).getRows();
  74. }
  75. if (eventData instanceof UpdateRowsEventData) {
  76. return ((UpdateRowsEventData) eventData).getRows()
  77. .stream()
  78. .map(Map.Entry::getValue)
  79. .collect(Collectors.toList());
  80. }
  81. if (eventData instanceof DeleteRowsEventData) {
  82. return ((DeleteRowsEventData) eventData).getRows();
  83. }
  84. return Collections.emptyList();
  85. }
  86. private List<Serializable[]> getBeforeValues(EventData eventData) {
  87. if (eventData instanceof WriteRowsEventData) {
  88. return null;
  89. }
  90. if (eventData instanceof UpdateRowsEventData) {
  91. return ((UpdateRowsEventData) eventData).getRows()
  92. .stream()
  93. .map(Map.Entry::getKey)
  94. .collect(Collectors.toList());
  95. }
  96. if (eventData instanceof DeleteRowsEventData) {
  97. return null;
  98. }
  99. return Collections.emptyList();
  100. }
  101. private BingLogRowData build(Event event, BinLogListener listener) {
  102. final EventData data = event.getData();
  103. BingLogRowData rowData = new BingLogRowData();
  104. if (data instanceof WriteRowsEventData) {
  105. WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
  106. listener.onWrite(writeRowsEventData);
  107. } else if (data instanceof UpdateRowsEventData) {
  108. UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
  109. listener.onUpdate(updateRowsEventData);
  110. } else if (data instanceof DeleteRowsEventData) {
  111. DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
  112. listener.onDelete(deleteRowsEventData);
  113. }
  114. final Map<Integer, String> table = bingLogTableHolder.getTable(this.dataBaseName, this.tableName);
  115. List<Map<String, Object>> afterMapList = new ArrayList<>();
  116. List<Map<String, Object>> beforeMapList = new ArrayList<>();
  117. final List<Serializable[]> beforeValues = getBeforeValues(data);
  118. final List<Serializable[]> afterValues = getAfterValues(data);
  119. if (afterValues != null) {
  120. for (int i = 0; i < afterValues.size(); i++) {
  121. Map<String, Object> afterMap = new HashMap<>();
  122. Map<String, Object> beforeMap = new HashMap<>();
  123. final Serializable[] after = afterValues.get(i);
  124. Serializable[] before = null;
  125. if (beforeValues != null) {
  126. before = beforeValues.get(i);
  127. }
  128. for (int ix = 0; ix < after.length; ix++) {
  129. String colName = table.get(ix + 1);
  130. if (colName == null) {
  131. continue;
  132. }
  133. Object afterValue = after[ix];
  134. afterMap.put(colName, afterValue);
  135. if ("id".equals(colName)){
  136. rowData.setDataId(afterValue);
  137. }
  138. if (before != null) {
  139. Object beforeValue = before[ix];
  140. beforeMap.put(colName, beforeValue);
  141. beforeMapList.add(beforeMap);
  142. rowData.setBefore(beforeMapList);
  143. }
  144. afterMapList.add(afterMap);
  145. rowData.setAfter(afterMapList);
  146. }
  147. }
  148. }
  149. return rowData;
  150. }
  151. }

上面是 binlog 的处理器,其中 register 方法是别的监听器注册的方法,当别的监听器注册到这个处理器上后,这个处理器就可以处理那些对应数据库数据更新监听,并且初始化其监听的数据库对应的表的字段数据

onEvent 方法 则是实现 BinaryLogClient.EventListener 的方法,当 BinaryLogClient 中监听的事件方法,此方法实现了获取监听到的数据,并将数据处理后分发给对应的监听器,让对应监听器处理其对应的数据变动

下面是对应的 binlog 数据库表字段监听的初始化类

  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import org.springframework.jdbc.core.JdbcTemplate;
  4. import org.springframework.jdbc.core.RowMapper;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. import java.sql.ResultSet;
  8. import java.sql.SQLException;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.stream.Collectors;
  13. /**
  14. * @author clown
  15. * @version 1.0.0
  16. * @ClassName BingLogTableHolder
  17. * @Description binlog 处理的表字段
  18. */
  19. @Component
  20. public class BingLogTableHolder {
  21. @Resource
  22. private JdbcTemplate jdbcTemplate;
  23. private static final String INIT_SQL = "select table_schema, table_name, column_name, " +
  24. "ordinal_position from information_schema.columns where table_schema = ? and table_name = ?";
  25. private static final Map<String, Map<Integer, String>> TABLE_COLUMN_MAP = new HashMap<>();
  26. public void initTable(String databaseName, String tableName) {
  27. final List<TableColumn> query = jdbcTemplate.query(INIT_SQL, new Object[]{databaseName, tableName}, (rs, index) -> new TableColumn().mapRow(rs, index));
  28. final Map<Integer, String> collect = query.stream()
  29. .collect(Collectors.toMap(TableColumn::getPosition, TableColumn::getColumnName));
  30. TABLE_COLUMN_MAP.put(databaseName + ":" + tableName, collect);
  31. }
  32. public Map<Integer, String> getTable(String databaseName, String tableName) {
  33. return TABLE_COLUMN_MAP.get(databaseName + ":" + tableName);
  34. }
  35. @Getter
  36. @Setter
  37. public static class TableColumn implements RowMapper<TableColumn> {
  38. private String columnName;
  39. private Integer position;
  40. //private String tableName;
  41. //private String databaseName;
  42. @Override
  43. public TableColumn mapRow(ResultSet rs, int i) throws SQLException {
  44. // this.databaseName = rs.getString("table_schema");
  45. // this.tableName = rs.getString("table_name");
  46. this.columnName = rs.getString("column_name");
  47. this.position = rs.getInt("ordinal_position");
  48. return this;
  49. }
  50. }
  51. }

上面的类比较简单,核心就两个方法,initTable 就是根据传入的数据库名称和表名称,将数据库中表的字段名和其所处的数据库中存储postion获取出来,因为binog 只能获取到数据在表中所处的第n个字段,所以需要这样处理

getTable就是根据传入的数据库名称和表名称获取当前表中字段的下标
接着就是监听器了

首先是监听器的接口

  1. import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
  2. import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
  3. import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
  4. /**
  5. * @author clown
  6. * @version 1.0.0
  7. * @ClassName BinLogListener
  8. * @Description bin
  9. */
  10. public interface BinLogListener {
  11. /**
  12. * 将监听器注册到处理器中
  13. */
  14. void register(BinlogHandler binlogHandler);
  15. /**
  16. * 当触发这些事件的时候
  17. *
  18. * @param rowData
  19. */
  20. void onEvent(BingLogRowData rowData) throws Exception;
  21. default void onUpdate(UpdateRowsEventData eventData) {
  22. System.out.println("UpdateRowsEventData:" +eventData);
  23. }
  24. default void onDelete(DeleteRowsEventData eventData) {
  25. System.out.println("DeleteRowsEventData:" +eventData);
  26. }
  27. default void onWrite(WriteRowsEventData eventData) {
  28. System.out.println("WriteRowsEventData:" +eventData);
  29. }

上面的 register 方法就是将此监听器注册到handler中的工具
onEvent方法 则是根据 handler 中的事件和监听类型分发处理的方法
其他的几个 默认方法是触发对应的事件后的方法,此处不再多加说明
然后是监听器的实现类

  1. import com.share51.cms.binlog.BinLogListener;
  2. import com.share51.cms.binlog.BingLogRowData;
  3. import com.share51.cms.binlog.BinlogHandler;
  4. import com.share51.cms.entity.Link;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author clown
  8. * @version 1.0.0
  9. * @ClassName LinkListener
  10. * @Description 友情管理监听器
  11. */
  12. @Component("linkListener")
  13. public class LinkListener implements BinLogListener {
  14. private static final String DATA_BASE_NAME = "cms_study";
  15. private static final String TABLE_NAME = "t_link";
  16. @Override
  17. public void register(BinlogHandler binlogHandler) {
  18. binlogHandler.register(DATA_BASE_NAME, TABLE_NAME, this);
  19. }
  20. @Override
  21. public void onEvent(BingLogRowData eventData) throws Exception {
  22. System.out.println(this.getClass().getName() + ":" + eventData);
  23. final Link beforeData = eventData.getBeforeData(Link.class);
  24. System.out.println(beforeData.getId());
  25. }
  26. }

此处不再多加描述,就是将 DATA_BASE_NAME 库和 TABLE_NAME 表注册到监听器中,让后处理其方法
接着是 handler 处理的数据变化后对象

  1. import com.github.shyiko.mysql.binlog.event.EventType;
  2. import com.share51.cms.entity.BaseEntity;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import lombok.ToString;
  6. import java.lang.reflect.Field;
  7. import java.util.*;
  8. import java.util.stream.Collectors;
  9. import java.util.stream.Stream;
  10. /**
  11. * @author clown
  12. * @version 1.0.0
  13. * @ClassName BingLogRowData
  14. * @Description binlog数据
  15. */
  16. @Getter
  17. @Setter
  18. @ToString
  19. public class BingLogRowData {
  20. /**
  21. * 数据库的名称
  22. */
  23. private String dataBaseName;
  24. /**
  25. * 监听到变化表的名称
  26. */
  27. private String tableName;
  28. /**
  29. * 修改数据库的主键 id
  30. */
  31. private Object dataId;
  32. /**
  33. * 监听的 binglog 类型
  34. */
  35. private EventType type;
  36. /**
  37. * 修改前的数据
  38. */
  39. private List<Map<String, Object>> after;
  40. /**
  41. * 修改后的数据
  42. */
  43. private List<Map<String, Object>> before;
  44. public <T extends BaseEntity> T getAfterData(Class<T> clazz) throws Exception {
  45. return getData(after, clazz);
  46. }
  47. public <T extends BaseEntity> T getBeforeData(Class<T> clazz) throws Exception {
  48. return getData(before, clazz);
  49. }
  50. private <T extends BaseEntity> T getData(List<Map<String, Object>> data, Class<T> clazz) throws IllegalAccessException, InstantiationException {
  51. final T t = clazz.newInstance();
  52. if (data != null) {
  53. data.forEach(map -> {
  54. for (String key : map.keySet()) {
  55. final String fieldName = getFieldName(key);
  56. Field declaredField = null;
  57. try {
  58. declaredField = clazz.getDeclaredField(fieldName);
  59. } catch (NoSuchFieldException e) {
  60. try {
  61. declaredField = clazz.getSuperclass().getDeclaredField(fieldName);
  62. } catch (NoSuchFieldException e1) {
  63. e1.printStackTrace();
  64. }
  65. }
  66. try {
  67. if (declaredField != null) {
  68. declaredField.setAccessible(true);
  69. declaredField.set(t, map.get(key));
  70. declaredField.setAccessible(false);
  71. }
  72. } catch (IllegalAccessException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. });
  77. }
  78. return t;
  79. }
  80. private String getFieldName(String columnName) {
  81. final String[] split = columnName.split("_");
  82. final String collect = Stream.of(split)
  83. .map(text ->
  84. text.substring(0, 1).toUpperCase() + text.substring(1, text.length())
  85. ).collect(Collectors.joining());
  86. return collect.substring(0, 1).toLowerCase() + collect.substring(1, collect.length());
  87. }
  88. }

上面的代码比较简单,此处就不多加描述,只是数据的处理交换层,相当于vo对象
然后是项目binlog的启动和关闭类

  1. import com.github.shyiko.mysql.binlog.BinaryLogClient;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.PostConstruct;
  5. import javax.annotation.PreDestroy;
  6. import javax.annotation.Resource;
  7. import java.io.IOException;
  8. import java.util.Map;
  9. import java.util.Set;
  10. /**
  11. * @author clown
  12. * @version 1.0.0
  13. * @ClassName BinlogClient
  14. * @Description binlog 客户端
  15. */
  16. @Component
  17. public class BinlogClient {
  18. @Resource
  19. private BinaryLogClient binaryLogClient;
  20. @Autowired
  21. private Map<String, BinLogListener> binLogListenerMap;
  22. @Resource
  23. private BinlogHandler binlogHandler;
  24. @PostConstruct
  25. public void start() {
  26. // 注册监听器
  27. final Set<Map.Entry<String, BinLogListener>> entries = binLogListenerMap.entrySet();
  28. for (Map.Entry<String, BinLogListener> entity : entries) {
  29. final BinLogListener listener = entity.getValue();
  30. listener.register(binlogHandler);
  31. }
  32. new Thread(() -> {
  33. binaryLogClient.registerEventListener(binlogHandler);
  34. try {
  35. binaryLogClient.connect();
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }).start();
  40. }
  41. @PreDestroy
  42. public void destroy() throws IOException {
  43. if (binaryLogClient != null) {
  44. binaryLogClient.disconnect();
  45. }
  46. }

此处也是比较简单的方法,只不过,此处用到了 spring 一个比较容易被忽视的功能,依赖查询,binLogListenerMap 中实际存储的是 实现 binLogListener 接口的所有bean,此处这么使用的原因是为了快速找到项目中的所有监听器,并且将所有的监听器注册到handler中。

然后在线程中启动这个对象的原因是,binlog 会造成线程的堵塞,若不在子线程中启动,就会一直堵塞项目启动。

最后就是客户端的配置了

  1. import com.github.shyiko.mysql.binlog.BinaryLogClient;
  2. import com.github.shyiko.mysql.binlog.event.deserialization.*;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import org.springframework.boot.context.properties.ConfigurationProperties;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @author clown
  10. * @version 1.0.0
  11. * @ClassName BinlogConfig
  12. * @Description mysql binlog 配置类
  13. */
  14. @Getter
  15. @Setter
  16. @Configuration
  17. @ConfigurationProperties(prefix = "share51.binlog")
  18. public class BinlogConfig {
  19. private String host;
  20. private Integer port = 3306;
  21. private String username;
  22. private String password;
  23. @Bean
  24. public BinaryLogClient binaryLogClient() {
  25. BinaryLogClient client = new BinaryLogClient(host, port, username, password);
  26. EventDeserializer eventDeserializer = new EventDeserializer();
  27. eventDeserializer.setCompatibilityMode(
  28. EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
  29. EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
  30. );
  31. return client;
  32. }

以上就是 binlog 的主要代码了,想要处理数据缓存的话,可以在每个监听器的 onEvent 中处理对应的缓存更新业务。