4、Event Store设计

支持多种存储模式,比如Memory内存模式。采用内存环装的设计来保存消息,借鉴了Disruptor的RingBuffer的实现思路。 RingBuffer设计:
image.png
定义了3个cursor:
put:Sink模块进行数据存储的最后一次写入位置(同步写入数据的cursor)
get:数据订阅获取的最后一次提取位置(同步获取的数据的cursor)
ack:数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
5、Canal~EventStore - 图2
实现说明:
put/get/ack cursor用于递增,采用long型存储。三者之间的关系为put>=get>=ackbuffer的get操作,通过取余或者&操作。(&操作:cusor & (size - 1) , size需要为2的指数,效率比较高)
4、Instance设计
image.png
instance代表了一个实际运行的数据队列,包括了EventPaser、EventSink、EventStore等组件。抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
manager方式:和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
spring方式:基于spring xml + properties进行定义,构建spring配置。 Server设计:
5、Canal~EventStore - 图4
server代表了一个Canal运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式)/Netty(网络访问)的两种实现。
增量订阅/消费设计:
5、Canal~EventStore - 图5
具体的协议格式,可参见:CanalProtocol.proto。数据对象格式:EntryProtocol.proto

  1. 具体的协议格式,可参见:CanalProtocol.proto。数据对象格式:EntryProtocol.proto
  2. Entry
  3. Header
  4. logfileName [binlog文件名]
  5. logfileOffset [binlog position]
  6. executeTime [binlog里记录变更发生的时间戳]
  7. schemaName [数据库实例]
  8. tableName [表名]
  9. eventType [insert/update/delete类型]
  10. entryType [事务头BEGIN/事务尾END/数据ROWDATA]
  11. storeValue [byte数据,可展开,对应的类型为RowChange]
  12. RowChange
  13. isDdl [是否是ddl变更操作,比如create table/drop table]
  14. sql [具体的ddl sql]
  15. rowDatas [具体insert/update/delete的变更数据,可为多条,1binlog event事件可对应多条变更,比如批处理]
  16. beforeColumns [Column类型的数组]
  17. afterColumns [Column类型的数组]
  18. Column
  19. index [column序号]
  20. sqlType [jdbc type]
  21. name [column name]
  22. isKey [是否为主键]
  23. updated [是否发生过变更]
  24. isNull [值是否为null]
  25. value [具体的内容,注意为文本]
  26. Entry
  27. Header
  28. logfileName [binlog文件名]
  29. logfileOffset [binlog position]
  30. executeTime [binlog里记录变更发生的时间戳]
  31. schemaName [数据库实例]
  32. tableName [表名]
  33. eventType [insert/update/delete类型]
  34. entryType [事务头BEGIN/事务尾END/数据ROWDATA]
  35. storeValue [byte数据,可展开,对应的类型为RowChange]
  36. RowChange
  37. isDdl [是否是ddl变更操作,比如create table/drop table]
  38. sql [具体的ddl sql]
  39. rowDatas [具体insert/update/delete的变更数据,可为多条,1binlog event事件可对应多条变更,比如批处理]
  40. beforeColumns [Column类型的数组]
  41. afterColumns [Column类型的数组]
  42. Column
  43. index [column序号]
  44. sqlType [jdbc type]
  45. name [column name]
  46. isKey [是否为主键]
  47. updated [是否发生过变更]
  48. isNull [值是否为null]
  49. value [具体的内容,注意为文本]
  50. 针对上述的补充说明:
  51. 1.可以提供数据库变更前和变更后的字段内容,针对binlog中没有的nameisKey等信息进行补全
  52. 2.可以提供ddl的变更语句

针对上述的补充说明:
1.可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name、isKey等信息进行补全
2.可以提供ddl的变更语句