1 分库分表实战

1.1 背景

刚开始我们的系统只用了单机数据库

随着用户的不断增多,考虑到系统的高可用和越来越多的用户请求,我们开始使用数据库主从架构

当用户量级和业务进一步提升后,写请求越来越多,这时我们开始使用了分库分表

1.2 遇到的问题

  • 用户请求量太大
    单服务器TPS、内存、IO都是有上限的,需要将请求打散分布到多个服务器

  • 单库数据量太大
    单个数据库处理能力有限;单库所在服务器的磁盘空间有限;单库上的操作IO有瓶颈

  • 单表数据量太大
    查询、插入、更新操作都会变慢,再加字段、加索引、机器迁移都会产生高负载,影响服务

1.3 如何解决

垂直拆分

垂直分库 微服务架构时,业务切割得足够独立,数据也会按照业务切分,保证业务数据隔离,大大提 升了数据库的吞吐能力

Mysql下笔记 - 图1

垂直分表 表中字段太多且包含大字段的时候,在查询时对数据库的IO、内存会受到影响,同时更新数 据时,产生的binlog文件会很大,MySQL在主从同步时也会有延迟的风险

Mysql下笔记 - 图2

水平拆分

水平分表 针对数据量巨大的单张表(比如订单表),按照规则把一张表的数据切分到多张表里面去。 但是这些表还是在同一个库中,所以库级别的数据库操作还是有IO瓶颈。

Mysql下笔记 - 图3

水平分库 将单张表的数据切分到多个服务器上去,每个服务器具有相应的库与表,只是表中数据集合 不同。 水平分库分表能够有效的缓解单机和单库的性能瓶颈和压力,突破IO、连接数、硬件 资源等的瓶颈

Mysql下笔记 - 图4

水平分库规则

使用的时候需要注意:

不跨库、不跨表,保证同一类的数据都在同一个服务器上面。

数据在切分之前,需要考虑如何高效的进行数据获取,如果每次查询都要跨越多个节点,就需要谨慎使用。

水平分表规则

  • RANGE

    • 时间:按照年、月、日去切分。例如order_2020、order_202005、order_20200501

    • 地域:按照省或市去切分。例如order_beijing、order_shanghai、order_chengdu

    • 大小:从0到1000000一个表。例如1000001-2000000放一个表,每100万放一个表

  • Hash (根据实际业务情况采用适合的hash算法)

    • 用户id取模
      不同的业务使用的切分规则是不一样,就上面提到的切分规则
      比如:

      站内信

①用户维度:用户只能看到发送给自己的消息,其他用户是不可见的,这种情况下是按照 用户ID hash分库,在用户查看历史记录翻页查询时,所有的查询请求都在同一个库内

用户表:以用户ID为划分依据,将数据水平切分到两个数据库实例,

① 范围法:如:1到1000W在 一张表,1000W到2000W在一张表,这种情况会出现单表的负载较高

② 按照用户ID HASH尽量保证用户数据均衡分到数据库中

如果在登录场景下,用户输入手机号和验证码进行登录,这种情况下,登录时是不是需要扫描所有分库的信息?

最终方案:用户信息采用ID做切分处理,同时存储用户ID和手机号的映射的关系 表(新增一个关系表),关系表采用手机号进行切分。可以通过关系表根据手机号查询到对应的ID,再定位用户信息。

流水表

① 时间维度:可以根据每天新增的流水来判断,选择按照年份分库,还是按照月份分库, 甚至也可以按照日期分库

订单表

在拉勾网,求职者(下面统称C端用户)投递企业(下面统称B端用户)的职位产生的记录称 之为订单表。在线上的业务场景中,C端用户看自己的投递记录,每次的投递到了哪个状态, B端用户查看自己收到的简历,对于合适的简历会进行下一步沟通,同一个公司内的员工可以 协作处理简历。

如何能同时满足C端和B端对数据查询,不进行跨库处理?

最终方案:为了同时满足两端用户的业务场景,采用空间换时间,将一次的投递记录存为两 份,C端的投递记录以用户ID为分片键,B端收到的简历按照公司ID为分片键

Mysql下笔记 - 图5

  • 主键选择
    UUID:本地生成,不依赖数据库,缺点就是作为主键性能太差
    SNOWFLAKE:百度UidGenerator、美团Leaf、基于SNOWFLAKE算法实现

  • 数据一致性
    强一致性:XA协议
    最终一致性:TCC、saga、Seata

  • 数据库扩容
    成倍增加数据节点,实现平滑扩容
    成倍扩容以后,表中的部分数据请求已被路由到其他节点上面,可以清理掉

  • 业务层改造
    基于代理层方式:Mycat、Sharding-Proxy、MySQL Proxy
    基于应用层方式:Sharding-jdbc

  • 分库后面临的问题
    事务问题:一次投递需要插入两条记录,且分布在不同的服务器上,数据需要保障一致性。
    跨库表的join问题:

    全局表(字典表):基础数据/配置数据,所有库都拷贝一份

字段冗余:可以使用字段冗余就不用join查询了

系统层组装:可以在业务层分别查询出来,然后组装起来,逻辑较复杂


额外的数据管理负担和数据运算压力:数据库扩容、维护成本变高

2 ShardingSphere

对于 5.0版本是一个大版本的迭代,改动很大。

2.1 概念

Apache ShardingSphere是一款开源的分布式数据库中间件组成的生态圈。它由Sharding-JDBC、 Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立的产品组成。 他们均提供标准化的数据 分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的 应用场景。

ShardingSphere定位为关系型数据库中间件,旨在充分合理地在分布式的场景下利用关系型数据库的 计算和存储能力,而并非实现一个全新的关系型数据库。

Mysql下笔记 - 图6

  • Sharding-JDBC:被定位为轻量级Java框架,在Java的JDBC层提供的额外服务,以jar包形式使用。
  • Sharding-Proxy:被定位为透明化的数据库代理端,提供封装了数据库二进制协议的服务端版 本,用于完成对异构语言的支持。
  • Sharding-Sidecar:被定位为Kubernetes或Mesos的云原生数据库代理,以DaemonSet的形式代 理所有对数据库的访问。

Mysql下笔记 - 图7

Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar三者区别如下:

Mysql下笔记 - 图8

2.2 ShardingJdbc

Sharding-JDBC定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库, 以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM 框架的使用。

  • 适用于任何基于Java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使 用JDBC。
  • 基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
  • 支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer和PostgreSQL。

Mysql下笔记 - 图9

Sharding-JDBC主要功能:

  • 数据分片
    分库、分表
    读写分离
    分片策略
    分布式主键

  • 分布式事务
    标准化的事务接口 Local
    XA强一致性事务 XA
    柔性事务 BASE

  • 数据库治理
    配置动态化
    编排和治理
    数据脱敏
    可视化链路追踪

2.3 ShardingJdbc 内部结构

Mysql下笔记 - 图10

各组件及其功能

  • 图中黄色部分表示的是Sharding-JDBC的入口API,采用工厂方法的形式提供。 目前有 ShardingDataSourceFactory和MasterSlaveDataSourceFactory两个工厂类。

    • ShardingDataSourceFactory支持分库分表、读写分离操作 这个需要同时配置在 ShardingRuleConfiguration中同时配置 TableRuleConfiguration和MasterSlaveConfiguaration了。因为我们分出来的节点可能会对应有从库,读操作需要从从库进行读取 。

    • MasterSlaveDataSourceFactory支持读写分离操作

  • 图中蓝色部分表示的是Sharding-JDBC的配置对象,提供灵活多变的配置方式。 ShardingRuleConfiguration是分库分表配置的核心和入口,它可以包含多个 TableRuleConfiguration和MasterSlaveRuleConfiguration。

    • TableRuleConfiguration封装的是表的分片配置信息,有5种配置形式对应不同的 Configuration类型。
    • MasterSlaveRuleConfiguration封装的是读写分离配置信息
  • 图中红色部分表示的是内部对象,由Sharding-JDBC内部使用,应用开发者无需关注。ShardingJDBC通过ShardingRuleConfiguration和MasterSlaveRuleConfiguration生成真正供 ShardingDataSource和MasterSlaveDataSource使用的规则对象。ShardingDataSource和 MasterSlaveDataSource实现了DataSource接口,是JDBC的完整实现方案。

Sharding-JDBC初始化流程

  • 根据配置的信息生成Configuration对象
  • 通过Factory会将Configuration对象转化为Rule对象
  • 通过Factory会将Rule对象与DataSource对象封装
  • Sharding-JDBC使用DataSource进行分库分表和读写分离操作

2.4 使用流程

1,引入依赖

  1. <dependency>
  2. <groupId>org.apache.shardingsphere</groupId>
  3. <artifactId>sharding-jdbc-core</artifactId>
  4. </dependency>

2,规则配置

Sharding-JDBC可以通过Java,YAML,Spring命名空间和Spring Boot Starter四种方式配置,开 发者可根据场景选择适合的配置方式。

3,创建DataSource

通过ShardingDataSourceFactory工厂和规则配置对象获取ShardingDataSource,然后即可通过 DataSource选择使用原生JDBC开发,或者使用JPA, MyBatis等ORM工具。

  1. DataSource dataSource =
  2. ShardingDataSourceFactory.createDataSource(dataSourceMap,
  3. shardingRuleConfig, props);

4 数据分片实战

4.1 数据分片核心概念

表概念

真实表:数据库中真实存在的物理表。例如b_order0、b_order1

逻辑表:在分片之后,同一类表结构的名称(总成)。例如b_order。

数据节点:在分片之后,由数据源和数据表组成。例如ds0.b_order1

绑定表指的是分片规则一致的关系表(主表、子表),例如b_order和b_order_item,均按照 order_id分片,则此两个表互为绑定表关系。绑定表之间的多表关联查询不会出现笛卡尔积 关联,可以提升关联查询效率。

  1. b_orderb_order0b_order1
  2. b_order_itemb_order_item0b_order_item1
  3. select * from b_order o join b_order_item i on(o.order_id=i.order_id)
  4. where o.order_id in (10,11);

比如10在b_order0里面,11在b_order1里面

如果不配置绑定表关系,采用笛卡尔积关联,会生成4个SQL

  1. select * from b_order0 o join b_order_item0 i on(o.order_id=i.order_id)
  2. where o.order_id in (10,11);
  3. select * from b_order0 o join b_order_item1 i on(o.order_id=i.order_id)
  4. where o.order_id in (10,11);
  5. select * from b_order1 o join b_order_item0 i on(o.order_id=i.order_id)
  6. where o.order_id in (10,11);
  7. select * from b_order1 o join b_order_item1 i on(o.order_id=i.order_id)
  8. where o.order_id in (10,11);

如果配置绑定表关系,生成2个SQL

  1. select * from b_order0 o join b_order_item0 i on(o.order_id=i.order_id)
  2. where o.order_id in (10,11);
  3. select * from b_order1 o join b_order_item1 i on(o.order_id=i.order_id)
  4. where o.order_id in (10,11);

广播表:在使用中,有些表没必要做分片,例如字典表、省份信息等,因为他们数据量不大,而且这 种表可能需要与海量数据的表进行关联查询。广播表会在不同的数据节点上进行存储,存储 的表结构和数据完全相同。

分片算法 ShardingAlgorithm

由于分片算法和业务实现紧密相关,因此并未提供内置分片算法,而是通过分片策略将各种场景提 炼出来,提供更高层级的抽象,并提供接口让应用开发者自行实现分片算法。目前提供4种分片算 法。

  • 精确分片算法PreciseShardingAlgorithm 用于处理使用单一键作为分片键的=与IN进行分片的场景。
  • 范围分片算法 RangeShardingAlgorithm 用于处理使用单一键作为分片键的BETWEEN AND、>、<、>=、<=进行分片的场景。
  • 复合分片算法ComplexKeysShardingAlgorithm 用于处理使用多键作为分片键进行分片的场景,多个分片键的逻辑较复杂,需要应用开发者自行处理其中的复杂度。
  • Hint分片算法HintShardingAlgorithm 用于处理使用Hint行分片的场景。对于分片字段非SQL决定,而由其他外置条件决定的场景,可使用SQL Hint灵活的注入分片字段。例:内部系统,按照员工登录主键分库,而数据 库中并无此字段。SQL Hint支持通过Java API和SQL注释两种方式使用。

分片策略 ShardingStrategy

分片策略包含分片键和分片算法,真正可用于分片操作的是分片键 + 分片算法,也就是分片策 略。目前提供5种分片策略。

  • 标准分片策略StandardShardingStrategy 只支持单分片键,提供对SQL语句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。
    提供PreciseShardingAlgorithm和RangeShardingAlgorithm两个分片算法。 PreciseShardingAlgorithm是必选的,RangeShardingAlgorithm是可选的。但是SQL中使用 了范围操作,如果不配置RangeShardingAlgorithm会采用全库路由扫描,效率低。

  • 复合分片策略ComplexShardingStrategy
    支持多分片键。提供对SQL语句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。由 于多分片键之间的关系复杂,因此并未进行过多的封装,而是直接将分片键值组合以及分片 操作符透传至分片算法,完全由应用开发者实现,提供最大的灵活度

  • 行表达式分片策略InlineShardingStrategy
    只支持单分片键。使用Groovy的表达式,提供对SQL语句中的=和IN的分片操作支持,对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发。如: tuser$-> {u_id % 8} 表示t_user表根据u_id模8,而分成8张表,表名称为t_user_0到t_user_7。

  • Hint分片策略HintShardingStrategy
    通过Hint指定分片值而非从SQL中提取分片值的方式进行分片的策略。

  • 不分片策略NoneShardingStrategy
    不分片的策略。

分片策略配置:

对于分片策略存有数据源分片策略和表分片策略两种维度,两种策略的API完全相同。

  • 数据源分片策略 用于配置数据被分配的目标数据源。

  • 表分片策略用于配置数据被分配的目标表,由于表存在与数据源内,所以表分片策略是依赖数据源分片策略结果的。

4.2 数据分片流程剖析

执行流程:

Mysql下笔记 - 图11

  • SQL解析 SQL解析分为词法解析和语法解析。
    先通过词法解析器将SQL拆分为一个个不可再分的单词。再使 用语法解析器对SQL进行理解,并最终提炼出解析上下文。
    Sharding-JDBC采用不同的解析器对SQL进行解析,解析器类型如下:

    • MySQL解析器

    • Oracle解析器

    • SQLServer解析器

    • PostgreSQL解析器

    • 默认SQL解析器 (比如db2,seadb可以使用)

  • 查询优化
    负责合并和优化分片条件,如OR等。Mysql也有自己的查询优化器。如果or的条件有索引,通常会拆分为union

  • SQL路由
    根据解析上下文匹配用户配置的分片策略,并生成路由路径。目前支持分片路由和广播路由。

  • SQL改写
    将SQL改写为在真实数据库中可以正确执行的语句。SQL改写分为正确性改写和优化改写。

  • SQL执行
    通过多线程执行器异步执行SQL。

  • 结果归并
    将多个执行结果集归并以便于通过统一的JDBC接口输出。结果归并包括流式归并、内存归并和使用装饰者模式的追加归并这几种方式。

解析->优化->路由->改写->执行->合并结果

4.3 Sql使用规范

支持项:

①路由至单数据节点时,目前MySQL数据库100%全兼容,其他数据库完善中。 路由至多数据节点时,全面支持DQL、DML、DDL、DCL、TCL。支持分页、去重、排 序、分组、聚合、关联查询(不支持跨库关联)。以下用最为复杂的查询为例:

  1. SELECT select_expr [, select_expr ...]
  2. FROM table_reference [, table_reference ...]
  3. [WHERE predicates]
  4. [GROUP BY {col_name | position} [ASC | DESC], ...]
  5. [ORDER BY {col_name | position} [ASC | DESC], ...]
  6. [LIMIT {[offset,] row_count | row_count OFFSET offset}]

不支持项:

②支持分页子查询,但其他子查询有限支持,无论嵌套多少层,只能解析至第一个包含数据表 的子查询,一旦在下层嵌套中再次找到包含数据表的子查询将直接抛出解析异常。

例如,以下子查询可以支持:

  1. SELECT COUNT(*) FROM (SELECT * FROM b_order o)

以下子查询不支持:

  1. SELECT COUNT(*) FROM (SELECT * FROM b_order o WHERE o.id IN (SELECT id
  2. FROM b_order WHERE status = ?))

③由于归并的限制,子查询中包含聚合函数目前无法支持。

④不支持包含schema的SQL。因为ShardingSphere的理念是像使用一个数据源一样使用多数 据源,因此对SQL的访问都是在同一个逻辑schema之上。

⑤当分片键处于运算表达式或函数中的SQL时,将采用全路由的形式获取结果

例如下面SQL,create_time为分片键:

  1. SELECT * FROM b_order WHERE to_date(create_time, 'yyyy-mm-dd') = '2020-
  2. 05-05';

由于ShardingSphere只能通过SQL字面提取用于分片的值,因此当分片键处于运算表达式 或函数中时,ShardingSphere无法提前获取分片键位于数据库中的值,从而无法计算出真正 的分片值。

不支持的SQL示例:

  1. INSERT INTO tbl_name (col1, col2, …) VALUES(1+2, ?, …) #VALUES语句不支持运算表达式
  2. INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, FROM tbl_name
  3. WHERE col3 = ? #INSERT .. SELECT
  4. SELECT COUNT(col1) as count_alias FROM tbl_name GROUP BY col1 HAVING count_alias > ? #HAVING
  5. SELECT * FROM tbl_name1 UNION SELECT * FROM tbl_name2 #UNION
  6. SELECT * FROM tbl_name1 UNION ALL SELECT * FROM tbl_name2 #UNION ALL
  7. SELECT * FROM ds.tbl_name1 #包含schema
  8. SELECT SUM(DISTINCT col1), SUM(col1) FROM tbl_name #同时使用普通聚合函数和DISTINCT
  9. SELECT * FROM tbl_name WHERE to_date(create_time, yyyy-mm-dd’) = ? #会导致全路由

4.4 分页查询

完全支持MySQL和Oracle的分页查询,SQLServer由于分页查询较为复杂,仅部分支持。

  • 性能瓶颈:
    查询偏移量过大的分页会导致数据库获取数据性能低下,以MySQL为例:
    1. SELECT * FROM b_order ORDER BY id LIMIT 1000000, 10


这句SQL会使得MySQL在无法利用索引的情况下跳过1000000条记录后,再获取10条记录, 其性能可想而知。 而在分库分表的情况下(假设分为2个库),为了保证数据的正确性,SQL 会改写为:

  1. SELECT * FROM b_order ORDER BY id LIMIT 0, 1000010


即将偏移量前的记录全部取出,并仅获取排序后的最后10条记录。这会在数据库本身就执行 很慢的情况下,进一步加剧性能瓶颈。 因为原SQL仅需要传输10条记录至客户端,而改写之 后的SQL则会传输1,000,010 2的记录至客户端。
ShardingSphere的优化
ShardingSphere进行了以下2个方面的优化。
①采用流式处理 + 归并排序的方式来避免内存的过量占用。
②ShardingSphere对仅落至单节点的查询进行进一步优化。
分页方案优化:
由于LIMIT并不能通过索引查询数据,因此*如果可以保证ID的连续性
,通过ID进行分页是比较 好的解决方案:

  1. SELECT * FROM b_order WHERE id > 1000000 AND id <= 1000010 ORDER BY id


或通过记录上次查询结果的最后一条记录的ID进行下一页的查询:

  1. SELECT * FROM b_order WHERE id > 1000000 LIMIT 10


但是我们的id不一定是连续的可能中间删除了:

  1. select * from u_user where id>(select id from u_user limit 10000,1) limit 10

4.5 Inline表达式

InlineShardingStrategy:采用Inline行表达式进行分片的配置。

Inline是可以简化数据节点和分片算法配置信息。主要是解决配置简化、配置一体化。

语法格式:

行表达式的使用非常直观,只需要在配置中使用${ expression }$->{ expression }标识行表达式 即可。例如:

  1. ${begin..end} 表示范围区间
  2. ${[unit1, unit2, unit_x]} 表示枚举值

行表达式中如果出现多个Mysql下笔记 - 图12->{}表达式,整个表达式结果会将每个子表达式结果进行笛卡尔积组合。例如,以下行表达式:

  1. ${['online', 'offline']}_table${1..3}
  2. $->{['online', 'offline']}_table$->{1..3}

最终会解析为:

  1. online_table1, online_table2, online_table3,
  2. offline_table1, offline_table2, offline_table3

数据节点配置: 对于均匀分布的数据节点,如果数据结构如下:

  1. db0
  2. ├── b_order2
  3. └── b_order1
  4. db1
  5. ├── b_order1
  6. └── b_order2

可以简化为:

  1. db${0..1}.b_order${1..2}
  2. 或者
  3. db$->{0..1}.b_order$->{1..2}

对于自定义的数据节点,如果数据结构如下:

  1. db0
  2. ├── b_order0
  3. └── b_order1
  4. db1
  5. ├── b_order2
  6. ├── b_order3
  7. └── b_order4

可以简化为:

  1. db0.b_order${0..1},db1.b_order${2..4}

分片算法配置:

行表达式内部的表达式本质上是一段Groovy代码,可以根据分片键进行计算的方式,返回相应的 真实数据源或真实表名称。

  1. ds${id % 10}
  2. 或者
  3. ds$->{id % 10}

sharding-jdbc 实现了DataSource,Connection,Statement,PrepareStatemnt,ResultSet。

相当于对原来连接池DataSource,Connection,Statement,PrepareStatemnt,ResultSet基础上的封装。

最后crud工作还是要交给原始的Statement去处理。

sharding-jdbc 的自动配置类为SpringBootConfiguration。会向容器中注入DataSource和事务注解扫描器ShardingTransactionTypeScanner。

  1. @Bean
  2. @Conditional(ShardingRuleCondition.class)
  3. public DataSource shardingDataSource() throws SQLException {
  4. return ShardingDataSourceFactory.createDataSource(dataSourceMap, new ShardingRuleConfigurationYamlSwapper().swap(shardingRule), props.getProps());
  5. }
  6. /**
  7. * Get master-slave data source bean.
  8. *
  9. * @return data source bean
  10. * @throws SQLException SQL exception
  11. */
  12. @Bean
  13. @Conditional(MasterSlaveRuleCondition.class)
  14. public DataSource masterSlaveDataSource() throws SQLException {
  15. return MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, new MasterSlaveRuleConfigurationYamlSwapper().swap(masterSlaveRule), props.getProps());
  16. }
  17. /**
  18. * Get encrypt data source bean.
  19. *
  20. * @return data source bean
  21. * @throws SQLException SQL exception
  22. */
  23. @Bean
  24. @Conditional(EncryptRuleCondition.class)
  25. public DataSource encryptDataSource() throws SQLException {
  26. return EncryptDataSourceFactory.createDataSource(dataSourceMap.values().iterator().next(), new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
  27. }
  28. /**
  29. * Get shadow data source bean.
  30. *
  31. * @return data source bean
  32. * @throws SQLException SQL exception
  33. */
  34. @Bean
  35. @Conditional(ShadowRuleCondition.class)
  36. public DataSource shadowDataSource() throws SQLException {
  37. return ShadowDataSourceFactory.createDataSource(dataSourceMap, new ShadowRuleConfigurationYamlSwapper().swap(shadowRule), props.getProps());
  38. }
  39. /**
  40. * Create sharding transaction type scanner.
  41. *
  42. * @return sharding transaction type scanner
  43. */
  44. @Bean
  45. public ShardingTransactionTypeScanner shardingTransactionTypeScanner() {
  46. return new ShardingTransactionTypeScanner();
  47. }

有四种DataSource,但是根据条件判断,只能生成一种DataSource。如果你的配置文件使满足多种,是会报错的。

  1. public final class ShardingRuleCondition extends SpringBootCondition {
  2. @Override
  3. public ConditionOutcome getMatchOutcome(final ConditionContext conditionContext, final AnnotatedTypeMetadata annotatedTypeMetadata) {
  4. boolean isMasterSlaveRule = new MasterSlaveRuleCondition().getMatchOutcome(conditionContext, annotatedTypeMetadata).isMatch();
  5. boolean isEncryptRule = new EncryptRuleCondition().getMatchOutcome(conditionContext, annotatedTypeMetadata).isMatch();
  6. boolean isShadow = new ShadowRuleCondition().getMatchOutcome(conditionContext, annotatedTypeMetadata).isMatch();
  7. return isMasterSlaveRule || isEncryptRule || isShadow ? ConditionOutcome.noMatch("Have found master-slave or encrypt rule in environment") : ConditionOutcome.match();
  8. }
  9. }

如果配置有MasterSlaveRule、EncryptRule、Shadow那就说明不会注入ShardingDataSource。

4.6 主键生成原理

ShardingSphere不仅提供了内置的分布式主键生成器,例如UUID、SNOWFLAKE。

当然还可以实现为org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator接口自定义主键生成器。

内置主键生成器: UUID 采用UUID.randomUUID()的方式产生分布式主键。

SNOWFLAKE 在分片规则配置模块可配置每个表的主键生成策略,默认使用雪花算法,生成64bit的长整型数据。

自定义主键生成器:

①自定义主键类,实现ShardingKeyGenerator接口

②按SPI规范配置自定义主键类 在Apache ShardingSphere中,很多功能实现类的加载方式是通过SPI注入的方式完成的。 注意:在resources目录下新建META-INF文件夹,再新建services文件夹,然后新建文件的 名字为org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator,打开文件,复制自定义主键类全路径到文件中保存。

③自定义主键类应用配置

  1. #对应主键字段名
  2. spring.shardingsphere.sharding.tables.t_book.key-generator.column=id
  3. #对应主键类getType返回内容
  4. spring.shardingsphere.sharding.tables.t_book.keygenerator.type=LAGOUKEY

5 ShardingJdbc实战

工共的配置:ConfigurationPropertyKey

5.1 分库,分库分表

注意点:

  • 广播表不能使用配置的默认的主键生成策略,要单独配置

    • 当我们使用sharding-jdbc提供的主键生生策略的时候,需要将主键生成策略应该配置成 @GeneratedValue(strategy = GenerationType.IDENTITY)。
  • 关联表不能跨库,我试了下将这个pid放到和主表不同的库会查不出来detail的信息的

配置文件:

application.yml配置文件

  1. spring:
  2. jpa:
  3. show-sql: true
  4. database: mysql
  5. hibernate:
  6. naming:
  7. #比如BusinessOrder转成business_order liveCity转成live_city
  8. physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy
  9. implicit-strategy: org.hibernate.boot.model.naming.ImplicitNamingStrategyJpaCompliantImpl
  10. profiles:
  11. active: sharding-jdbc

application-sharding-jdbc.properties配置文件

  1. #打印sql
  2. spring.shardingsphere.props.sql.show=true
  3. #datasource
  4. spring.shardingsphere.datasource.names=ds0,ds1
  5. #数据库连接池类型
  6. spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
  7. spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
  8. #连接信息
  9. spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/order0?serverTimezone=GMT%2B8
  10. spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
  11. spring.shardingsphere.datasource.ds0.username=root
  12. spring.shardingsphere.datasource.ds0.password=root
  13. spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://localhost:3306/order1?serverTimezone=GMT%2B8
  14. spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.cj.jdbc.Driver
  15. spring.shardingsphere.datasource.ds1.username=root
  16. spring.shardingsphere.datasource.ds1.password=root
  17. #sharding database
  18. #指定分片键
  19. spring.shardingsphere.sharding.tables.position.database-strategy.inline.sharding-column=id
  20. #分片表达式
  21. spring.shardingsphere.sharding.tables.position.database-strategy.inline.algorithm-expression=ds${id%2}
  22. #指定分片键
  23. spring.shardingsphere.sharding.tables.position_detail.database-strategy.inline.sharding-column=pid
  24. #分片表达式
  25. spring.shardingsphere.sharding.tables.position_detail.database-strategy.inline.algorithm-expression=ds${pid%2}
  26. #指定默认id生成策略
  27. spring.shardingsphere.sharding.default-key-generator.column=id
  28. spring.shardingsphere.sharding.default-key-generator.type=SNOWFLAKE
  29. spring.shardingsphere.sharding.default-key-generator.props.worker.id=1
  30. spring.shardingsphere.sharding.default-key-generator.props.max.vibration.offset=2
  31. #指定表id生成策略
  32. spring.shardingsphere.sharding.tables.position.key-generator.column=id
  33. spring.shardingsphere.sharding.tables.position.key-generator.type=SNOWFLAKE
  34. #算法属性 可以看没课策略里面可以配置的属性
  35. spring.shardingsphere.sharding.tables.position.key-generator.props.worker.id=2
  36. spring.shardingsphere.sharding.tables.position.key-generator.props.max.vibration.offset=3
  37. #配置广播表
  38. #多个用逗号隔开
  39. spring.shardingsphere.sharding.broadcast-tables=city
  40. spring.shardingsphere.sharding.tables.city.key-generator.column=id
  41. spring.shardingsphere.sharding.tables.city.key-generator.type=SNOWFLAKE
  42. spring.shardingsphere.sharding.tables.city.key-generator.props.worker.id=2
  43. spring.shardingsphere.sharding.tables.city.key-generator.props.max.vibration.offset=3
  44. #即分库又分表
  45. #根据company_id分库
  46. spring.shardingsphere.sharding.tables.b_order.database-strategy.inline.sharding-column=company_id
  47. spring.shardingsphere.sharding.tables.b_order.database-strategy.inline.algorithm-expression=ds${company_id%2}
  48. #按照id分表
  49. spring.shardingsphere.sharding.tables.b_order.table-strategy.inline.sharding-column=id
  50. spring.shardingsphere.sharding.tables.b_order.table-strategy.inline.algorithm-expression=b_order${id%2}
  51. spring.shardingsphere.sharding.tables.b_order.actual-data-nodes=ds${0..1}.b_order${0..1}

测试类:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = ShardingJdbcApp.class)
  3. public class ShardingJdbcTest {
  4. @Autowired
  5. private PositionRepository positionRepository;
  6. @Autowired
  7. private PositionDetailRepository positionDetailRepository;
  8. @Autowired
  9. private CityRepository cityRepository;
  10. @Autowired
  11. private BOrderRepository bOrderRepository;
  12. @Test
  13. public void addPosition1() {
  14. int len = 20;
  15. List<Position> positions = new ArrayList<>(len);
  16. Position position;
  17. for (long i = len; i > 1; i--) {
  18. position = new Position();
  19. position.setId(i);
  20. position.setName("稻壳儿");
  21. position.setSalary("19999");
  22. position.setLiveCity("东莞");
  23. positions.add(position);
  24. }
  25. positionRepository.saveAll(positions);
  26. }
  27. @Test
  28. //雪花算法生成id
  29. public void addPosition2() {
  30. int len = 10;
  31. List<Position> positions = new ArrayList<>(len);
  32. Position position;
  33. for (long i = len; i > 1; i--) {
  34. position = new Position();
  35. position.setName("花生壳儿");
  36. position.setSalary("29999");
  37. position.setLiveCity("北京");
  38. positions.add(position);
  39. }
  40. positionRepository.saveAll(positions);
  41. }
  42. @Test
  43. public void addPositionDetail1(){
  44. int len = 10;
  45. //注意主键生成策略应该配置成 @GeneratedValue(strategy = GenerationType.IDENTITY)
  46. List<PositionDetail> positionDetails = new ArrayList<>();
  47. PositionDetail positionDetail;
  48. for (long i = 1; i < len; i++) {
  49. positionDetail = new PositionDetail();
  50. positionDetail.setId(i);
  51. positionDetail.setPid(2l);
  52. positionDetail.setDescription("稻壳儿描述信息");
  53. positionDetails.add(positionDetail);
  54. }
  55. positionDetailRepository.saveAll(positionDetails);
  56. }
  57. @Test
  58. public void addPositionAndDetail1(){
  59. int len = 10;
  60. Position position;
  61. PositionDetail positionDetail;
  62. for (long i = len; i > 1; i--) {
  63. position = new Position();
  64. position.setName("花生壳儿");
  65. position.setSalary("29999");
  66. position.setLiveCity("北京");
  67. positionRepository.save(position);
  68. positionDetail = new PositionDetail();
  69. positionDetail.setPid(position.getId());
  70. positionDetail.setDescription("这是备注信息");
  71. positionDetailRepository.save(positionDetail);
  72. }
  73. }
  74. @Test
  75. public void findByIdJoin(){
  76. //关联表不能跨库,我试了下将这个pid放到和主表不同的库会查不出来detail的信息的
  77. Object position = positionRepository.getById(510152504277737472l);
  78. Object[] objects = (Object[])position;
  79. System.out.println(Arrays.toString(objects));
  80. }
  81. @Test
  82. public void addBroadcastTableValue(){
  83. //city为广播表,会向每个库都插入数据
  84. //广播表不能使用配置的默认的主键生成策略,要单独配置
  85. City city = new City();
  86. city.setName("东莞");
  87. city.setProvince("广东");
  88. cityRepository.save(city);
  89. }
  90. @Test
  91. //根据company_id分库 id在同一个库进行分表
  92. public void addBOrder(){
  93. int len = 100;
  94. BOrder bOrder;
  95. for (int i = 0; i < len; i++) {
  96. bOrder = new BOrder();
  97. bOrder.setCompanyId(i);
  98. bOrder.setUserId(3242);
  99. bOrder.setDel(false);
  100. bOrder.setCreateTime(LocalDateTime.now());
  101. bOrder.setOperateTime(LocalDateTime.now());
  102. bOrder.setName("不加葱");
  103. bOrder.setPositionId(2345l);
  104. bOrder.setPositionName("java高级");
  105. bOrder.setPublishUserId(239002);
  106. bOrder.setResumeId(98823);
  107. bOrder.setResumeType(1);
  108. bOrder.setStatus("AUTO_FILTER");
  109. bOrder.setWorkYear("3年");
  110. bOrderRepository.save(bOrder);
  111. }
  112. }
  113. @Test
  114. public void getBOrderById(){
  115. //因为没有指定companyId所以没法确定属于那个库的数据,路由到ds0,ds1都去查询一次
  116. Optional<BOrder> byId = bOrderRepository.findById(510503386764808193l);
  117. System.out.println(byId.get());
  118. }
  119. @Test
  120. public void getBOrderByIdAncCompanyId(){
  121. //指定id 和companyId会锁定唯一库和唯一分片
  122. BOrder result = bOrderRepository.findByIdAndCompanyId(510503386764808193l,4);
  123. System.out.println(result);
  124. }
  125. }

BOrder实体

  1. @Entity(name = "b_order")
  2. public class BOrder {
  3. @Id
  4. @GeneratedValue(strategy = GenerationType.IDENTITY)
  5. private long id;
  6. private Boolean isDel;
  7. private Integer companyId;
  8. private long positionId;
  9. private Integer userId;
  10. private Integer publishUserId;
  11. private Integer resumeType;
  12. private String status;
  13. private LocalDateTime createTime;
  14. private LocalDateTime operateTime;
  15. private String workYear;
  16. private String name;
  17. private String positionName;
  18. private Integer resumeId;
  19. }

City实体

  1. @Entity
  2. public class City implements java.io.Serializable {
  3. @Id
  4. @GeneratedValue(strategy = GenerationType.IDENTITY)
  5. private Long id;
  6. private String name;
  7. private String province;
  8. }

Position实体

  1. @Entity
  2. public class Position implements Serializable {
  3. @Id
  4. @GeneratedValue(strategy = GenerationType.IDENTITY)
  5. private Long id;
  6. private String name;
  7. private String salary;
  8. private String liveCity;
  9. }

PositionDetail实体

  1. @Entity
  2. public class PositionDetail implements java.io.Serializable{
  3. @Id
  4. @GeneratedValue(strategy = GenerationType.IDENTITY)
  5. private Long id;
  6. private Long pid;
  7. private String description;
  8. }

5.2 读写分离

主从架构:读写分离,目的是高可用、读写扩展。主从库内容相同,根据SQL语义进行路由。

分库分表架构:数据分片,目的读写扩展、存储扩容。库和表内容不同,根据分片配置进行路由。

将水平分片和读写分离联合使用,能够更加有效的提升系统性能

下图展现了将分库分表与读写分离一 同使用时,应用程序与数据库集群之间的复杂拓扑关系。

Mysql下笔记 - 图13

读写分离虽然可以提升系统的吞吐量和可用性,但同时也带来了数据不一致的问题,包括多个主库之间 的数据一致性,以及主库与从库之间的数据一致性的问题。 并且,读写分离也带来了与数据分片同样的 问题,它同样会使得应用开发和运维人员对数据库的操作和运维变得更加复杂。

透明化读写分离所带来的影响,让使用方尽量像使用一个数据库一样使用主从数据库集群,是 ShardingSphere读写分离模块的主要设计目标。

  • 核心功能
    提供一主多从的读写分离配置。仅支持单主库,可以支持独立使用,也可以配合分库分表使用
    独立使用读写分离,支持SQL透传。不需要SQL改写流程 同一线程且同一数据库连接内,能保证数据一致性。如果有写入操作,后续的读操作均从主 库读取。
    基于Hint的强制主库路由。可以强制路由走主库查询实时数据,避免主从同步数据延迟。

  • 不支持项
    主库和从库的数据同步
    主库和从库的数据同步延迟
    主库双写或多写
    跨主库和从库之间的事务的数据不一致。建议在主从架构中,事务中的读写均用主库操作。

配置文件:

application.yml

  1. spring:
  2. jpa:
  3. show-sql: true
  4. database: mysql
  5. hibernate:
  6. naming:
  7. physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy
  8. implicit-strategy: org.hibernate.boot.model.naming.ImplicitNamingStrategyJpaCompliantImpl
  9. profiles:
  10. active: master-slave

application-master-slave.properties

  1. #打印sql
  2. spring.shardingsphere.props.sql.show=true
  3. #datasource
  4. spring.shardingsphere.datasource.names=master,slave0
  5. #数据库连接池类型
  6. spring.shardingsphere.datasource.master.type=com.zaxxer.hikari.HikariDataSource
  7. spring.shardingsphere.datasource.slave0.type=com.zaxxer.hikari.HikariDataSource
  8. #连接信息
  9. spring.shardingsphere.datasource.master.jdbc-url=jdbc:mysql://localhost:3306/order0?serverTimezone=GMT%2B8
  10. spring.shardingsphere.datasource.master.driver-class-name=com.mysql.cj.jdbc.Driver
  11. spring.shardingsphere.datasource.master.username=root
  12. spring.shardingsphere.datasource.master.password=root
  13. spring.shardingsphere.datasource.slave0.jdbc-url=jdbc:mysql://localhost:3306/order1?serverTimezone=GMT%2B8
  14. spring.shardingsphere.datasource.slave0.driver-class-name=com.mysql.cj.jdbc.Driver
  15. spring.shardingsphere.datasource.slave0.username=root
  16. spring.shardingsphere.datasource.slave0.password=root
  17. #简单主从配置 只有主从库,没有主库分片
  18. #主库名字
  19. spring.shardingsphere.masterslave.master-data-source-name=master
  20. #从库名字
  21. spring.shardingsphere.masterslave.slave-data-source-names=slave0
  22. #数据源的名字 可以随便指定一个
  23. spring.shardingsphere.masterslave.name=datasource
  24. #从库负载均衡算法类型 RANDOM,ROUND_ROBIN MasterSlaveLoadBalanceAlgorithm可以利用spi机制自己实现
  25. #只会对从库进行负载均衡
  26. spring.shardingsphere.masterslave.load-balance-algorithm-type=RANDOM
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = ShardingJdbcApp.class)
  3. public class MasterSlaveTest {
  4. @Autowired
  5. private PositionRepository positionRepository;
  6. @Test
  7. public void addPosition(){
  8. Position position = new Position();
  9. position.setLiveCity("东莞");
  10. position.setSalary("199999");
  11. position.setName("Java高级开发");
  12. positionRepository.save(position);
  13. }
  14. @Test
  15. public void find(){
  16. List<Position> all = positionRepository.findAll();
  17. all.stream().forEach(System.out::println);
  18. }
  19. @Test
  20. @Transactional
  21. public void insertAndFind(){
  22. Position position = new Position();
  23. position.setLiveCity("东莞");
  24. position.setSalary("199999");
  25. position.setName("Java高级开发");
  26. positionRepository.save(position);
  27. List<Position> all = positionRepository.findAll();
  28. all.stream().forEach(System.out::println);
  29. }
  30. }

我这里没有实际搭建主从库,是用两个库来模拟的。插入数据确实是往master库添加的,读取数据是从库读取的。不过如果同一个事务内既有增删改操作又有查询操作,走的都是主库。

5.3 设计升级过程

读写分离应用方案

在数据量不是很多的情况下,我们可以将数据库进行读写分离,以应对高并发的需求,通过水平扩展从库,来缓解查询的压力。

Mysql下笔记 - 图14

分表+读写分离

在数据量达到500万的时候,这时数据量预估千万级别,我们可以将数据进行分表存储。

Mysql下笔记 - 图15

分库分表+读写分离

在数据量继续扩大,这时可以考虑分库分表,将数据存储在不同数据库的不同表中。

Mysql下笔记 - 图16

这个配置相对于前面的单独的读写分离有一些复杂

application-sharding-master-slave.properties

  1. #数据源
  2. spring.shardingsphere.datasource.names=master0,slave0,slave1,master1,slave2,slave3
  3. spring.shardingsphere.datasource.master0.type=com.zaxxer.hikari.HikariDataSource
  4. spring.shardingsphere.datasource.master0.driver-class-name=com.mysql.jdbc.Driver
  5. spring.shardingsphere.datasource.master0.jdbc-url=jdbc:mysql://localhost:3306/master0?useUnicode=true&characterEncoding=utf-8&useSSL=false
  6. spring.shardingsphere.datasource.master0.username=root
  7. spring.shardingsphere.datasource.master0.password=root
  8. spring.shardingsphere.datasource.slave0.type=com.zaxxer.hikari.HikariDataSource
  9. spring.shardingsphere.datasource.slave0.driver-class-name=com.mysql.jdbc.Driver
  10. spring.shardingsphere.datasource.slave0.jdbc-url=jdbc:mysql://localhost:3306/slave0?useSSL=false
  11. spring.shardingsphere.datasource.slave0.username=root
  12. spring.shardingsphere.datasource.slave0.password=root
  13. spring.shardingsphere.datasource.slave1.type=com.zaxxer.hikari.HikariDataSource
  14. spring.shardingsphere.datasource.slave1.driver-class-name=com.mysql.jdbc.Driver
  15. spring.shardingsphere.datasource.slave1.jdbc-url=jdbc:mysql://localhost:3306/slave1?useSSL=false
  16. spring.shardingsphere.datasource.slave1.username=root
  17. spring.shardingsphere.datasource.slave1.password=root
  18. spring.shardingsphere.datasource.master1.type=com.zaxxer.hikari.HikariDataSource
  19. spring.shardingsphere.datasource.master1.driver-class-name=com.mysql.jdbc.Driver
  20. spring.shardingsphere.datasource.master1.jdbc-url=jdbc:mysql://localhost:3306/master1?useUnicode=true&characterEncoding=utf-8&useSSL=false
  21. spring.shardingsphere.datasource.master1.username=root
  22. spring.shardingsphere.datasource.master1.password=root
  23. spring.shardingsphere.datasource.slave2.type=com.zaxxer.hikari.HikariDataSource
  24. spring.shardingsphere.datasource.slave2.driver-class-name=com.mysql.jdbc.Driver
  25. spring.shardingsphere.datasource.slave2.jdbc-url=jdbc:mysql://localhost:3306/slave2?useSSL=false
  26. spring.shardingsphere.datasource.slave2.username=root
  27. spring.shardingsphere.datasource.slave2.password=root
  28. spring.shardingsphere.datasource.slave3.type=com.zaxxer.hikari.HikariDataSource
  29. spring.shardingsphere.datasource.slave3.driver-class-name=com.mysql.jdbc.Driver
  30. spring.shardingsphere.datasource.slave3.jdbc-url=jdbc:mysql://localhost:3306/slave3?useSSL=false
  31. spring.shardingsphere.datasource.slave3.username=root
  32. spring.shardingsphere.datasource.slave3.password=root
  33. #分库分表
  34. spring.shardingsphere.sharding.tables.b_order.database-strategy.inline.sharding-column=company_id
  35. spring.shardingsphere.sharding.tables.b_order.database-strategy.inline.algorithm-expression=master$->{company_id % 2}
  36. spring.shardingsphere.sharding.tables.b_order.actual-data-nodes=master$->{0..1}.b_order$->{0..1}
  37. spring.shardingsphere.sharding.tables.b_order.table-strategy.inline.sharding-column=id
  38. spring.shardingsphere.sharding.tables.b_order.table-strategy.inline.algorithm-expression=b_order$->{id % 2}
  39. #读写分离
  40. #主库分片一
  41. spring.shardingsphere.sharding.master-slave-rules.master0.master-data-source-name=master0
  42. #主库分片一的从库
  43. spring.shardingsphere.sharding.master-slave-rules.master0.slave-data-source-names=slave0,slave1
  44. #从库负载均衡算法
  45. spring.shardingsphere.sharding.master-slave-rules.master0.load-balance-algorithm-type=RANDOM
  46. #主库分片二
  47. spring.shardingsphere.sharding.master-slave-rules.master1.master-data-source-name=master1
  48. #主库分片二的从库
  49. spring.shardingsphere.sharding.master-slave-rules.master1.slave-data-source-names=slave2,slave3
  50. #从库负载均衡算法
  51. spring.shardingsphere.sharding.master-slave-rules.master1.load-balance-algorithm-type=RANDOM

5.4 强制路由

配置文件

注意:一个逻辑表不能配置多种sharding-strategy不然启动会报错

application-sharding-hint.properties

  1. #打印sql
  2. spring.shardingsphere.props.sql.show=true
  3. #datasource
  4. spring.shardingsphere.datasource.names=ds0,ds1
  5. #数据库连接池类型
  6. spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
  7. spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
  8. #连接信息
  9. spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/order0?serverTimezone=GMT%2B8
  10. spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
  11. spring.shardingsphere.datasource.ds0.username=root
  12. spring.shardingsphere.datasource.ds0.password=root
  13. spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://localhost:3306/order1?serverTimezone=GMT%2B8
  14. spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.cj.jdbc.Driver
  15. spring.shardingsphere.datasource.ds1.username=root
  16. spring.shardingsphere.datasource.ds1.password=root
  17. #分库策略
  18. #hint只用指定策略,不用再指定列表,以及表达式了
  19. spring.shardingsphere.sharding.tables.position.database-strategy.hint.algorithm-class-name=com.wh.learn.algorithm.CustomHintStrategy
  20. #指定默认id生成策略
  21. spring.shardingsphere.sharding.default-key-generator.column=id
  22. spring.shardingsphere.sharding.default-key-generator.type=SNOWFLAKE
  23. spring.shardingsphere.sharding.default-key-generator.props.worker.id=1
  24. spring.shardingsphere.sharding.default-key-generator.props.max.vibration.offset=2

自定义的hint的路由策略

  1. public class CustomHintStrategy implements HintShardingAlgorithm<Long> {
  2. @Override
  3. public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Long> hintShardingValue) {
  4. //collection参数为配置的 datasource名称 spring.shardingsphere.datasource.names=ds0,ds1
  5. //HintShardingValue 有三个值logicTableName逻辑表明
  6. //columnName列明 这个是空的
  7. //values 为HintManager中设置的值,,正常来说我们需要根据value值来进行库的选择
  8. //如果返回多个,那么多个库都会写入数据
  9. //我这里的所有的都用第一个
  10. String first = null;
  11. for (String s : collection) {
  12. first = s;
  13. break;
  14. }
  15. return Arrays.asList(first);
  16. }
  17. }

测试类

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = ShardingJdbcApp.class)
  3. public class HintShardingStrategyTest {
  4. @Autowired
  5. private PositionRepository positionRepository;
  6. @Test
  7. public void addPosition(){
  8. HintManager hintManager = HintManager.getInstance();
  9. //如果这里不设置值,就会全路由,在所有分片表中写入数据
  10. //addTableShardingValue(logicTable,value)这个方法设置的值无效
  11. hintManager.setDatabaseShardingValue(2L);
  12. Position position = new Position();
  13. position.setLiveCity("广州");
  14. position.setSalary("399999");
  15. position.setName("java架构");
  16. positionRepository.save(position);
  17. hintManager.close();
  18. }
  19. @Test
  20. public void find(){
  21. HintManager hintManager = HintManager.getInstance();
  22. hintManager.setDatabaseShardingValue(2L);
  23. List<Position> all = positionRepository.findAll();
  24. all.stream().forEach(System.out::println);
  25. }
  26. }

在读写分离结构中,为了避免主从同步数据延迟及时获取刚添加或更新的数据,可以采用强制路由走主库查询实时数据,使用hintManager.setMasterRouteOnly设置主库路由即可。

5.5 数据脱敏剖析

数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数据的可靠保护。涉及客户 安全数据或者一些商业性敏感数据,如身份证号、手机号、卡号、客户号等个人信息按照规定,都需要 进行数据脱敏。

数据脱敏模块属于ShardingSphere分布式治理这一核心功能下的子功能模块。

  • 在更新操作时,它通过对用户输入的SQL进行解析,并依据用户提供的脱敏配置对SQL进行改写, 从而实现对原文数据进行加密,并将密文数据存储到底层数据库。
  • 在查询数据时,它又从数据库中取出密文数据,并对其解密,最终将解密后的原始数据返回给用 户。

Apache ShardingSphere自动化&透明化了数据脱敏过程,让用户无需关注数据脱敏的实现细节,像使用普通数据那样使用脱敏数据。

整体架构

ShardingSphere提供的Encrypt-JDBC和业务代码部署在一起。业务方需面向Encrypt-JDBC进行JDBC编 程

Mysql下笔记 - 图17

Encrypt-JDBC将用户发起的SQL进行拦截,并通过SQL语法解析器进行解析、理解SQL行为,再依据用 户传入的脱敏规则,找出需要脱敏的字段和所使用的加解密器对目标字段进行加解密处理后,再与底层 数据库进行交互。

脱敏规则

脱敏配置主要分为四部分:数据源配置,加密器配置,脱敏表配置以及查询属性配置,其详情如下图所 示:

Mysql下笔记 - 图18

  • 数据源配置:指DataSource的配置信息
  • 加密器配置:指使用什么加密策略进行加解密。目前ShardingSphere内置了两种加解密策略: AES/MD5
  • 脱敏表配置:指定哪个列用于存储密文数据(cipherColumn)、哪个列用于存储明文数据 (plainColumn)以及用户想使用哪个列进行SQL编写(logicColumn)
  • 查询属性的配置(query.with.cipher.column):当底层数据库表里同时存储了明文数据、密文数据后,该属性开关用于决定是直 接查询数据库表里的明文数据进行返回,还是查询密文数据通过Encrypt-JDBC解密后返回。

脱敏处理流程

Mysql下笔记 - 图19

下方图片展示了使用Encrypt-JDBC进行增删改查时,其中的处理流程和转换逻辑

Mysql下笔记 - 图20

加密解密解析

ShardingSphere提供了两种加密策略用于数据脱敏,该两种策略分别对应ShardingSphere的两种加解密的接口,即Encryptor和QueryAssistedEncryptor。

  • Encryptor
    该解决方案通过提供encrypt(), decrypt()两种方法对需要脱敏的数据进行加解密。在用户进行 INSERT, DELETE, UPDATE时,ShardingSphere会按照用户配置,对SQL进行解析、改写、路由, 并会调用encrypt()将数据加密后存储到数据库, 而在SELECT时,则调用decrypt()方法将从数据库中取出的脱敏数据进行逆向解密,最终将原始数据返回给用户。 ShardingSphere针对这种类型的脱敏解决方案提供了两种具体实现类,分别是MD5(不可 逆),AES(可逆),用户只需配置即可使用这两种内置的方案。

  • QueryAssistedEncryptor
    相比较于第一种脱敏方案,该方案更为安全和复杂。它的理念是:即使是相同的数据,如两个用户 的密码相同,它们在数据库里存储的脱敏数据也应当是不一样的。这种理念更有利于保护用户信 息,防止撞库成功。
    它提供三种函数进行实现,分别是encrypt(), decrypt(), queryAssistedEncrypt()。在encrypt()阶 段,用户通过设置某个变动种子,例如时间戳。针对原始数据+变动种子组合的内容进行加密,就能保证即使原始数据相同,也因为有变动种子的存在,致使加密后的脱敏数据是不一样的。在 decrypt()可依据之前规定的加密算法,利用种子数据进行解密。queryAssistedEncrypt()用于生成 辅助查询列,用于原始数据的查询过程。
    但是,ShardingSphere针对这种类型的脱敏解决方案并没有提供具体实现类,却将该理念抽象成接口,提供给用户自行实现。ShardingSphere将调用用户提供的该方案的具体实现类进行数据脱敏。

使用

配置文件

application-encryptor.properties

  1. #打印sql
  2. spring.shardingsphere.props.sql.show=true
  3. #datasource
  4. spring.shardingsphere.datasource.names=ds0
  5. #数据库连接池类型
  6. spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
  7. #连接信息
  8. spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/order0?serverTimezone=GMT%2B8
  9. spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
  10. spring.shardingsphere.datasource.ds0.username=root
  11. spring.shardingsphere.datasource.ds0.password=root
  12. #指定表id生成策略
  13. spring.shardingsphere.sharding.tables.c_user.key-generator.column=id
  14. spring.shardingsphere.sharding.tables.c_user.key-generator.type=SNOWFLAKE
  15. #算法属性 可以看没课策略里面可以配置的属性
  16. spring.shardingsphere.sharding.tables.c_user.key-generator.props.worker.id=2
  17. spring.shardingsphere.sharding.tables.c_user.key-generator.props.max.vibration.offset=3
  18. #encryptor配置
  19. #加密策略類型配置
  20. spring.shardingsphere.encrypt.encryptors.c_user_pwd_encryptor.type=AES
  21. #可以在AESEncryptor这个类里面看所需属性 aes加密解密key
  22. spring.shardingsphere.encrypt.encryptors.c_user_pwd_encryptor.props.aes.key.value=123456
  23. #指定加密策略
  24. spring.shardingsphere.encrypt.tables.c_user.columns.pwd.encryptor=c_user_pwd_encryptor
  25. #可以看 YamlEncryptColumnRuleConfiguration配置类里面的属性
  26. #明文 pwd为逻辑列明 pwd_plain物理存在的明文列 可以不指定物理列,那么物理列就不会存入数据
  27. #spring.shardingsphere.encrypt.tables.c_user.columns.pwd.plain-column=pwd_plain
  28. #密文 物理存在的pwd_cipher密文列
  29. spring.shardingsphere.encrypt.tables.c_user.columns.pwd.cipher-column=pwd_cipher
  30. #默认为true 是否使用加密数据进行查询
  31. spring.shardingsphere.query.with.cipher.column=true

测试类

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = ShardingJdbcApp.class)
  3. public class EncryptorTest {
  4. @Autowired
  5. private CUserRepository cUserRepository;
  6. @Test
  7. public void addCUser(){
  8. CUser cUser = new CUser();
  9. cUser.setName("王哈哈");
  10. cUser.setPwd("root");
  11. cUserRepository.save(cUser);
  12. }
  13. @Test
  14. public void findByPwd(){
  15. //spring.shardingsphere.query.with.cipher.column=true 用密文进行查询
  16. CUser root = cUserRepository.findByPwd("root");
  17. System.out.println(root);
  18. }
  19. }

实体

  1. @Entity(name = "c_user")
  2. public class CUser {
  3. @Id
  4. @GeneratedValue(strategy = GenerationType.IDENTITY)
  5. private Long id;
  6. private String name;
  7. private String pwd;
  8. }

6 分布式理论回顾一下

6.1 CAP和BASE理论

CAP理论:CAP 定理,又被叫作布鲁尔定理。对于共享数据系统,最多只能同时拥有CAP其中的两个,任意两 个都有其适应的场景。

Mysql下笔记 - 图21

BASE理论:BASE 是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。它的核心思想是即使无法做到强一致性(CAP 就是强一致性),但应用可以采用 适合的方式达到最终一致性。

  1. BA指的是基本业务可用性,支持分区失败;
  2. S表示柔性状态,也就是允许短时间内不同步;
  3. E表示最终一致性,数据最终是一致的,但是实时是不一致的。

原子性和持久性必须从根本上保障,为了可用性、性能和服务降级的需要,只有降低一致性和隔离性的要求。BASE 解决了 CAP 理论中没有考虑到的网络延迟问题,在BASE中用软状态和最终一 致,保证了延迟后的一致性。

6.2 2PC (强一致性)

2PC是Two-Phase Commit缩写,即两阶段提交,就是将事务的提交过程分为两个阶段来进行处 理。事务的发起者称协调者,事务的执行者称参与者。协调者统一协调参与者执行。

  • 阶段一 准备阶段
    协调者向所有参与者发送事务内容,询问是否可以提交事务,并等待所有参与者答复。 各参与者执行事务操作,但不提交事务,将 undo 和 redo 信息记入事务日志中。 如参与者执行成功,给协调者反馈 yes;如执行失败,给协调者反馈 no。

  • 阶段二 提交阶段
    如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(rollback)消息; 否则,发送提交(commit)消息。

2PC 方案实现起来简单,实际项目中使用比较少,主要因为以下问题:

  • 性能问题:所有参与者在事务提交阶段处于同步阻塞状态,占用系统资源,容易导致性能瓶 颈。
  • 可靠性问题:如果协调者存在单点故障问题,如果协调者出现故障,参与者将一直处于锁定状态。
  • 数据一致性问题:在阶段 2 中,如果发生局部网络问题,一部分事务参与者收到了提交消 息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。

6.3 3PC (强一致性)

3PC 三阶段提交,是两阶段提交的改进版本,与两阶段提交不同的是,同时在协调者和参与者中都引入超时机制。三阶段提交将两阶段的准备阶段拆分为 2 个阶段,插入了一个 preCommit 阶段,解决了原先在两阶段提交中,参与者在准备之后,由于协调者或参与者发生崩 溃或错误,而导致参与者无法知晓处于长时间等待的问题。

如果在指定的时间内协调者没有收到参与者的消息则默认失败。

  • 阶段一 canCommit
    协调者向参与者发送 canCommit 请求,参与者如果可以提交就返回 yes 响应,否则返回 no 响应。

  • 阶段二 preCommit
    协调者根据阶段一 canCommit 参与者的反应情况执行预提交事务或中断事务操作。

    • 参与者均反馈 yes:协调者向所有参与者发出 preCommit 请求,参与者收到 preCommit 请求后,执行事务操作,但不提交;将 undo 和 redo 信息记入事务日志 中;各参与者向协调者反馈 ack 响应或 no 响应,并等待最终指令。
    • 任何一个参与者反馈 no或等待超时:协调者向所有参与者发出 abort 请求,无论收到 协调者发出的 abort 请求,或者在等待协调者请求过程中出现超时,参与者均会中断本次事务
  • 阶段三 commint
    该阶段进行真正的事务提交,根据阶段二 preCommit反馈的结果完成事务提交或中断操作。

相比2PC模式,3PC模式降低了阻塞范围,在等待超时后协调者或参与者会中断事务。避免了协调者单点问题,阶段三中协调者出现问题时(比如网络中断等),参与者会继续提交事务。

如此,如果在第三阶段协调者如果发送abort请求,但是有的参与者没有收到abort请求出现超时时自动提交事务也会导致数据不一致性的问题。

7 分布式事务整合原理

7.1 XA(强一致性)

两阶段事务提交采用的是X/OPEN组织所定义的DTP模型,通过抽象出来的AP, TM, RM的概念可以保证事务的强一致性。 其中TMRM间采用XA的协议进行双向通信。 与传统的本地事务相比,XA事务增加了prepare阶段,数据库除了被动接受提交指令外,还可以反向通知调用方事务是否可以被提交。 因此TM可以收集所有分支事务的prepare结果,最后进行原子的提交,保证事务的强一致性。基于数据库的XA协议来实现2PC又称为XA方案

Mysql下笔记 - 图22

XA之所以需要引入事务管理器,是因为在分布式系统中,从理论上讲两台机器理论上无法达到一 致的状态,需要引入一个单点进行协调。由全局事务管理器管理和协调的事务,可以跨越多个资源 (数据库)和进程。

事务管理器用来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务管理器收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是RM的角色,而不是事务管理器。

7.2 TCC(最终一致性)

TCC(Try-Confirm-Cancel)的概念,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。TCC 是服务化的两阶段 编程模型,其 Try、Confirm、Cancel 3 个方法均由业务编码实现:

  • Try 操作作为一阶段,负责资源的检查和预留;
  • Confirm 操作作为二阶段提交操作,执行真正的业务;
  • Cancel 是预留资源的取消;

TCC事务模式相对于 XA 等传统模型对比:

Mysql下笔记 - 图23

相比于XA,解决了:

  • 解决了协调者单点,由主业务方发起并完成这个业务活动。业务活动管理器可以变成多点, 引入集群。
  • 同步阻塞:引入超时机制,超时后进行补偿,并且不会锁定整个资源,将资源转换为业务逻辑形式,粒度变小。
  • 数据一致性,有了补偿机制之后,由业务活动管理器控制一致性。

消息队列模式(最终一致性):

消息队列的方案最初是由 eBay 提出,基于TCC模式,消息中间件可以基于 Kafka、RocketMQ 等 消息队列。此方案的核心是将分布式事务拆分成本地事务进行处理,将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或MQ中间件,再通过业务规则人工发起重试。

处理流程:

Mysql下笔记 - 图24

  • 步骤1:事务主动方处理本地事务。 事务主动方在本地事务中处理业务更新操作和MQ写消息操作。
  • 步骤 2:事务主动方通过消息中间件,通知事务被动方处理事务通知事务待消息。 事务主动方主动写消息到MQ,事务消费方接收并处理MQ中的消息。
  • 步骤 3:事务被动方通过MQ中间件,通知事务主动方事务已处理的消息,事务主动方根据反馈结果提交或回滚事务。

为了数据的一致性,当流程中遇到错误需要重试,容错处理规则如下:

  • 当步骤 1 处理出错,事务回滚,相当于什么都没发生。
  • 当步骤 2 处理出错,由于未处理的事务消息还是保存在事务发送方,可以重试或撤销本地业务操作。
  • 如果事务被动方消费消息异常,需要不断重试,业务处理逻辑需要保证幂等。
  • 如果是事务被动方业务上的处理失败,可以通过MQ通知事务主动方进行补偿或者事务回滚。
  • 如果多个事务被动方已经消费消息,事务主动方需要回滚事务时需要通知事务被动方回滚。

7.3 Saga模式(最终一致性)

Saga这个概念源于 1987 年普林斯顿大学的 Hecto 和 Kenneth 发表的一篇数据库论文Sagas ,一 个Saga事务是一个有多个短时事务组成的长时的事务。 在分布式事务场景下,我们把一个Saga分 布式事务看做是一个由多个本地事务组成的事务,每个本地事务都有一个与之对应的补偿事务。在 Saga事务的执行过程中,如果某一步执行出现异常,Saga事务会被终止,同时会调用对应的补偿事务完成相关的恢复操作,这样保证Saga相关的本地事务要么都是执行成功,要么通过补偿恢复 成为事务执行之前的状态。(自动反向补偿机制)。

Saga 事务基本协议如下:

  • 每个 Saga 事务由一系列幂等的有序子事务(sub-transaction) Ti 组成。
  • 每个 Ti 都有对应的幂等补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果。

Saga是一种补偿模式,它定义了两种补偿策略:

  • 向前恢复(forward recovery):对应于上面第一种执行顺序,发生失败进行重试,适用于必须要成功的场景。
  • 向后恢复(backward recovery):对应于上面提到的第二种执行顺序,发生错误后撤销掉之前所有成功的子事务,使得整个 Saga 的执行结果撤销。

Mysql下笔记 - 图25

事务正常执行完成:T1, T2, T3, …, Tn,例如:减库存(T1),创建订单(T2),支付(T3),依次有序完 成整个事务。

事务回滚:T1, T2, …, Tj, Cj,…, C2, C1,其中 0 < j < n,例如:减库存(T1),创建订单(T2),支付 (T3),支付失败,支付回滚(C3),订单回滚(C2),恢复库存(C1)。

7.4 Seta框架

Fescar开源项目,最初愿景是能像本地事务一样控制分布式事务,解决分布式环境下的难题。

Seata(Simple Extensible Autonomous Transaction Architecture)是一套一站式分布式事务解决方案,是阿里集团和蚂蚁金服联合打造的分布式事务框架。Seata目前的事务模式有AT、TCC、 Saga和XA,默认是AT模式,AT本质上是2PC协议的一种实现。

Seata AT事务模型包含TM(事务管理器),RM(资源管理器),TC(事务协调器)。其中TC是一个独立 的服务需要单独部署,TM和RM以jar包的方式同业务应用部署在一起,它们同TC建立长连接,在 整个事务生命周期内,保持RPC通信。

  • 全局事务的发起方作为TM,全局事务的参与者作为RM
  • TM负责全局事务的begin和commit/rollback
  • RM负责分支事务的执行结果上报,并且通过TC的协调进行commit/rollback。

Mysql下笔记 - 图26

在 Seata 中,AT时分为两个阶段的,

  • 第一阶段,就是各个阶段本地提交操作;
  • 第二阶段会根据第 一阶段的情况决定是进行全局提交还是全局回滚操作。

具体的执行流程如下:

  • TM 开启分布式事务,负责全局事务的begin和commit/rollback(TM 向 TC 注册全局事务记 录); -
  • RM 作为参与者,负责分支事务的执行结果上报,并且通过TC的协调进行 commit/rollback(RM 向 TC 汇报资源准备状态 );
  • RM分支事务结束,事务一阶段结束;
  • 根据TC 汇总事务信息,由TM发起事务提交或回滚操作
  • TC 通知所有 RM 提交/回滚资源,事务二阶段结束;

7.5 Sharding整合XA原理

Java通过定义JTA接口实现了XA的模型,JTA接口里的ResourceManager需要数据库厂商提供XA的驱动 实现,而TransactionManager则需要事务管理器的厂商实现,传统的事务管理器需要同应用服务器绑 定,因此使用的成本很高。 而嵌入式的事务管器可以以jar包的形式提供服务,同ShardingSphere集成 后,可保证分片后跨库事务强一致性。

ShardingSphere支持以下功能:

  • 支持数据分片后的跨库XA事务
  • 两阶段提交保证操作的原子性和数据的强一致性
  • 服务宕机重启后,提交/回滚中的事务可自动恢复
  • SPI机制整合主流的XA事务管理器,默认Atomikos
  • 同时支持XA和非XA的连接池
  • 提供spring-boot和namespace的接入端

ShardingSphere整合XA事务时,分离了XA事务管理和连接池管理,这样接入XA时,可以做到对业务的零侵入。

Mysql下笔记 - 图27

  • Begin(开启XA全局事务)
    XAShardingTransactionManager会调用具体的XA事务管理器开启XA的全局事务。

  • 执行物理SQL ShardingSphere进行解析/优化/路由后会生成SQL操作,执行引擎为每个物理SQL创建连接的同时,物理连接所对应的XAResource也会被注册到当前XA事务中。事务管理器会在此阶段发送 XAResource.start命令给数据库,数据库在收到XAResource.end命令之前的所有SQL操作,会被 标记为XA事务。

    1. XAResource1.start ## Enlist阶段执行
    2. statement.execute("sql1"); ## 模拟执行一个分片SQL1
    3. statement.execute("sql2"); ## 模拟执行一个分片SQL2
    4. XAResource1.end ## 提交阶段执行


执行这里sql1和sql2将会被标记为XA事务。

  • Commit/rollback(提交XA事务)
    XAShardingTransactionManager收到接入端的提交命令后,会委托实际的XA事务管理进行提交动作,这时事务管理器会收集当前线程里所有注册的XAResource,首先发送XAResource.end指令,用以标记此XA事务的边界。 接着会依次发送prepare指令,收集所有参与XAResource投票, 如果所有XAResource的反馈结果都是OK,则会再次调用commit指令进行最终提交,如果有一个 XAResource的反馈结果为No,则会调用rollback指令进行回滚。 在事务管理器发出提交指令后, 任何XAResource产生的异常都会通过recovery日志进行重试,来保证提交阶段的操作原子性,和数据强一致性。
    1. XAResource1.prepare ## ack: yes
    2. XAResource2.prepare ## ack: yes
    3. XAResource1.commit
    4. XAResource2.commit
    5. XAResource1.prepare ## ack: yes
    6. XAResource2.prepare ## ack: no
    7. XAResource1.rollback
    8. XAResource2.rollback

7.6 Sharding-JDBC整合Saga原理

ShardingSphere的柔性事务已通过第三方servicecomb-saga组件实现的,通过SPI机制注入使用。 ShardingSphere是基于反向SQL技术实现的反向补偿操作,它将对数据库进行更新操作的SQL自动生成 反向SQL,并交由Saga-actuator引擎执行。使用方则无需再关注如何实现补偿方法,将柔性事务管理器 的应用范畴成功的定位回了事务的本源——数据库层面。ShardingSphere支持以下功能:

完全支持跨库事务支持失败SQL重试及最大努力送达支持反向SQL、自动生成更新快照以及自动补偿 默认使用关系型数据库进行快照及事务日志的持久化,支持使用SPI的方式加载其他类型的持久化。

Saga柔性事务的实现类为SagaShardingTransactionMananger, ShardingSphere通过Hook的方式拦截逻辑SQL的解析和路由结果,这样,在分片物理SQL执行前,可以生成逆向SQL,在事务提交阶段再把SQL调用链交给Saga引擎处理。

Mysql下笔记 - 图28

执行流程:

  • Init(Saga引擎初始化) 包含Saga柔性事务的应用启动时,saga-actuator引擎会根据saga.properties的配置进行初始化的 流程。
  • Begin(开启Saga全局事务) 每次开启Saga全局事务时,将会生成本次全局事务的上下文(SagaTransactionContext),事务上下文记录了所有子事务的正向SQL和逆向SQL,作为生成事务调用链的元数据使用。
  • 执行物理SQL 在物理SQL执行前,ShardingSphere根据SQL的类型生成逆向SQL,这里是通过Hook的方式拦截 Parser的解析结果进行实现。
  • Commit/rollback(提交Saga事务) 提交阶段会生成Saga执行引擎所需的调用链路图,commit操作产生ForwardRecovery(正向SQL 补偿)任务,rollback操作产生BackwardRecovery任务(逆向SQL补偿)。

7.7 Sharding-JDBC整合Seata原理

Seata AT事务作为BASE柔性事务的一种实现,可以无缝接入到ShardingSphere生态中。在整合Seata AT事务时,需要把TM,RM,TC的模型融入到ShardingSphere 分布式事务的SPI的生态中。在数据库资源上,Seata通过对接DataSource接口,让JDBC操作可以同TC进行RPC通信。同样, ShardingSphere也是面向DataSource接口对用户配置的物理DataSource进行了聚合,因此把物理 DataSource二次包装为Seata 的DataSource后,就可以把Seata AT事务融入到ShardingSphere的分片 中。

Mysql下笔记 - 图29

  • Init(Seata引擎初始化)
    包含Seata柔性事务的应用启动时,用户配置的数据源会按seata.conf的配置,适配成Seata事务所 需的DataSourceProxy,并且注册到RM中。

  • Begin(开启Seata全局事务)
    TM控制全局事务的边界,TM通过向TC发送Begin指令,获取全局事务ID,所有分支事务通过此全 局事务ID,参与到全局事务中;全局事务ID的上下文存放在当前线程变量中。

  • 执行分片物理SQL
    处于Seata全局事务中的分片SQL通过RM生成undo快照,并且发送participate指令到TC,加入到全局事务中。ShardingSphere的分片物理SQL是按多线程方式执行,因此整合Seata AT事务时, 需要在主线程和子线程间进行全局事务ID的上下文传递,这同服务间的上下文传递思路完全相同。

  • Commit/rollback(提交Seata事务)
    提交Seata事务时,TM会向TC发送全局事务的commit和rollback指令,TC根据全局事务ID协调所有分支事务进行commit和rollback。

8 分布式事务整合

sharding-jdbc和xa,saga,seata整合起来很方便,底层把细节都封装好了。面试的时候主要就是问一下前面的理论以及底层实现的原理。

依赖,用什么引入什么就可以

  1. //XA模式
  2. <dependency>
  3. <groupId>org.apache.shardingsphere</groupId>
  4. <artifactId>sharding-transaction-xa-core</artifactId>
  5. <version>${shardingsphere.version}</version>
  6. </dependency>
  7. //Saga模式
  8. <dependency>
  9. <groupId>io.shardingsphere</groupId>
  10. <artifactId>sharding-transaction-base-saga</artifactId>
  11. <version>${shardingsphere-spi-impl.version}</version>
  12. </dependency>
  13. //Seata模式
  14. <dependency>
  15. <groupId>org.apache.shardingsphere</groupId>
  16. <artifactId>sharding-transaction-base-seata-at</artifactId>
  17. <version>${sharding-sphere.version}</version>
  18. </dependency>

在需要标注事务的方法加@Transactional注解和@ShardingTransactionType然后指定TransactionType

参数配置:

ShardingSphere默认的XA事务管理器为Atomikos,通过在项目的classpath中添加jta.properties 来定制化Atomikos配置项。

  1. #指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
  2. com.atomikos.icatch.enable_logging=true
  3. #JTA/XA资源是否应该自动注册
  4. com.atomikos.icatch.automatic_resource_registration=true
  5. #JTA事务的默认超时时间,默认为10000ms
  6. com.atomikos.icatch.default_jta_timeout=10000
  7. #事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
  8. com.atomikos.icatch.max_timeout=300000
  9. #指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
  10. com.atomikos.icatch.threaded_2pc=false
  11. #指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制
  12. com.atomikos.icatch.max_actives=50
  13. #是否支持subtransaction,默认为true
  14. com.atomikos.icatch.allow_subtransactions=true
  15. #指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
  16. com.atomikos.icatch.serial_jta_transactions=true
  17. #指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
  18. com.atomikos.icatch.force_shutdown_on_vm_exit=false
  19. #在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
  20. com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807
  21. ===============================================================
  22. ========= 事务日志(Transaction logs)记录配置 =======
  23. ===============================================================
  24. #事务日志目录,默认为./。
  25. com.atomikos.icatch.log_base_dir=./
  26. #事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
  27. com.atomikos.icatch.log_base_name=tmlog
  28. #指定两次checkpoint的时间间隔,默认为500
  29. com.atomikos.icatch.checkpoint_interval=500
  30. ===============================================================
  31. ========= 事务日志恢复(Recovery)配置 =============
  32. ===============================================================
  33. #指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
  34. com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
  35. #指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
  36. com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
  37. #提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
  38. com.atomikos.icatch.oltp_max_retries=5
  39. #提交失败时,每次重试的时间间隔,默认10000ms
  40. com.atomikos.icatch.oltp_retry_interval=10000
  41. ===============================================================
  42. ========= 其他 =============================== ==
  43. ===============================================================
  44. java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory
  45. com.atomikos.icatch.client_demarcation=false
  46. java.naming.provider.url=rmi://localhost:1099
  47. com.atomikos.icatch.rmi_export_class=none
  48. com.atomikos.icatch.trust_client_tm=false

Saga可以通过在项目的classpath中添加 saga.properties 来定制化Saga事务的配置项。配置项 的属性及说明如下:

  1. saga.actuator.executor.size=5 #使用的线程池大小
  2. saga.actuator.transaction.max.retries=5 #失败SQL的最大重试次数
  3. saga.actuator.compensation.max.retries=5 #失败SQL的最大尝试补偿次数
  4. saga.actuator.transaction.retry.delay.milliseconds=5000 #失败SQL的重试间隔,单位毫秒
  5. saga.actuator.compensation.retry.delay.milliseconds=3000 #失败SQL的补偿间隔,单位毫秒
  6. saga.persistence.enabled=false #是否对日志进行持久化
  7. saga.persistence.ds.url="jdbc:mysql://localhost:3306/order0" 无事务日志数据库JDBC连接
  8. saga.persistence.ds.username="root" #无事务日志数据库用户名
  9. saga.persistence.ds.password="root" #无 事务日志数据库密码
  10. saga.persistence.ds.max.pool.size=50 #事务日志连接池最大连接数
  11. saga.persistence.ds.min.pool.size=1 #事务日志连接池最小连接数
  12. saga.persistence.ds.max.life.time.milliseconds=0 #(0无限制)事务日志连接池最大存活时间,单位毫秒
  13. saga.persistence.ds.idle.timeout.milliseconds=60*1000 #事务日志连接池空闲回收时间,单位毫秒
  14. saga.persistence.ds.connection.timeout.milliseconds=30*1000 #事务日志连接池超时时间,单位毫秒

8.1 Seata

AT模式

前提

  • 基于支持本地 ACID 事务的关系型数据库。
  • Java 应用,通过 JDBC 访问数据库。

整体机制:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:

    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

写隔离:

  • 一阶段本地事务提交前,需要确保先拿到 全局锁
  • 拿不到 全局锁 ,不能提交本地事务。
  • 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

以一个示例来说明:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

Mysql下笔记 - 图30

如果tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

Mysql下笔记 - 图31

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

读隔离:

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

Mysql下笔记 - 图32

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

工作机制:

以一个示例来说明整个 AT 分支的工作过程。

业务表:product

Field Type Key
id bigint(20) PRI
name varchar(100)
since varchar(100)

AT 分支事务的业务逻辑:

update product set name = ‘GTS’ where name = ‘TXC’;

一阶段

过程:

  1. 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
  2. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
  1. select id, name, since from product where name = 'TXC';

得到前镜像:

id name since
1 TXC 2014
  1. 执行业务 SQL:更新这条记录的 name 为 ‘GTS’。

  2. 查询后镜像:根据前镜像的结果,通过 主键 定位数据。

  1. select id, name, since from product where id = 1;

得到后镜像:

id name since
1 GTS 2014
  1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中
    1. {
    2. "branchId": 641789253,
    3. "undoItems": [{
    4. "afterImage": {
    5. "rows": [{
    6. "fields": [{
    7. "name": "id",
    8. "type": 4,
    9. "value": 1
    10. }, {
    11. "name": "name",
    12. "type": 12,
    13. "value": "GTS"
    14. }, {
    15. "name": "since",
    16. "type": 12,
    17. "value": "2014"
    18. }]
    19. }],
    20. "tableName": "product"
    21. },
    22. "beforeImage": {
    23. "rows": [{
    24. "fields": [{
    25. "name": "id",
    26. "type": 4,
    27. "value": 1
    28. }, {
    29. "name": "name",
    30. "type": 12,
    31. "value": "TXC"
    32. }, {
    33. "name": "since",
    34. "type": 12,
    35. "value": "2014"
    36. }]
    37. }],
    38. "tableName": "product"
    39. },
    40. "sqlType": "UPDATE"
    41. }],
    42. "xid": "xid:xxx"
    43. }
  1. 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁
  2. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  3. 将本地事务提交的结果上报给 TC。

二阶段回滚

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。

  2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。

  3. 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。

  4. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:

    1. update product set name = 'TXC' where id = 1;
  1. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

二阶段-提交

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

UNDO_LOG Table:不同数据库在类型上会略有差别。

mysql为例:

  1. -- 注意此处0.7.0+ 增加字段 context
  2. CREATE TABLE `undo_log` (
  3. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  4. `branch_id` bigint(20) NOT NULL,
  5. `xid` varchar(100) NOT NULL,
  6. `context` varchar(128) NOT NULL,
  7. `rollback_info` longblob NOT NULL,
  8. `log_status` int(11) NOT NULL,
  9. `log_created` datetime NOT NULL,
  10. `log_modified` datetime NOT NULL,
  11. PRIMARY KEY (`id`),
  12. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  13. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

TCC 模式

回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction ModeManual (Branch) Transaction Mode.

AT 模式基于 支持本地 ACID 事务关系型数据库

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

SAGA 模式

Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

Mysql下笔记 - 图33

适用场景:

  • 业务流程长、业务流程多
  • 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

优势:

  • 一阶段提交本地事务,无锁,高性能
  • 事件驱动架构,参与者可异步执行,高吞吐
  • 补偿服务易于实现

缺点:

  • 不保证隔离性

9 Spi机制

在Apache ShardingSphere中,很多功能实现类的加载方式是通过SPI注入的方式完成的。 Service Provider Interface (SPI)是一种为了被第三方实现或扩展的API,它可以用于实现框架扩展或组件替换。

Apache ShardingSphere之所以采用SPI方式进行扩展,是出于整体架构最优设计考虑。 为了让高级用户通过实现Apache ShardingSphere提供的相应接口,动态将用户自定义的实现类加载其中,从而在保持Apache ShardingSphere架构完整性与功能稳定性的情况下,满足用户不同场景的实际需求。

本章节汇总了Apache ShardingSphere所有通过SPI方式载入的功能模块。 如无特殊需求,用户可以使用Apache ShardingSphere提供的内置实现,并通过简单配置即可实现相应功能;高级用户则可以参考各个功能模块的接口进行自定义实现。

Mysql下笔记 - 图34

9.1 SQL解析

SQL解析的接口用于规定用于解析SQL的ANTLR语法文件。

主要接口是SQLParserEntry,其内置实现类有MySQLParserEntry, PostgreSQLParserEntry, SQLServerParserEntryOracleParserEntry

9.2 数据库协议

数据库协议的接口用于Sharding-Proxy解析与适配访问数据库的协议。

主要接口是DatabaseProtocolFrontendEngine,其内置实现类有MySQLProtocolFrontendEnginePostgreSQLProtocolFrontendEngine

9.3 数据脱敏

数据脱敏的接口用于规定加解密器的加密、解密、类型获取、属性设置等方式。

主要接口有两个:ShardingEncryptorShardingQueryAssistedEncryptor,其中ShardingEncryptor的内置实现类有AESShardingEncryptorMD5ShardingEncryptor

9.4 分布式主键

分布式主键的接口主要用于规定如何生成全局性的自增、类型获取、属性设置等。

主要接口为ShardingKeyGenerator,其内置实现类有UUIDShardingKeyGeneratorSnowflakeShardingKeyGenerator

9.5 分布式事务

分布式事务的接口主要用于规定如何将分布式事务适配为本地事务接口。

主要接口为ShardingTransactionManager,其内置实现类有XAShardingTransactionManagerSeataATShardingTransactionManager

org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine#loadShardingTransactionManager

  1. private void loadShardingTransactionManager() {
  2. for (ShardingTransactionManager each : ServiceLoader.load(ShardingTransactionManager.class)) {
  3. if (transactionManagerMap.containsKey(each.getTransactionType())) {
  4. log.warn("Find more than one {} transaction manager implementation class, use `{}` now",
  5. each.getTransactionType(), transactionManagerMap.get(each.getTransactionType()).getClass().getName());
  6. continue;
  7. }
  8. transactionManagerMap.put(each.getTransactionType(), each);
  9. }
  10. }

9.6 XA事务管理器

XA事务管理器的接口主要用于规定如何将XA事务的实现者适配为统一的XA事务接口。

主要接口为XATransactionManager,其内置实现类有AtomikosTransactionManager, NarayanaXATransactionManagerBitronixXATransactionManager

9.7 注册中心

注册中心的接口主要用于规定注册中心初始化、存取数据、更新数据、监控等行为。

主要接口为RegistryCenter,其内置实现类有Zookeeper。

10 Sharding-Jdbc 编排治理

编排治理模块提供配置中心/注册中心(以及规划中的元数据中心)、配置动态化、数据库熔断禁用、调用链路等治理能力。

10.1 配置中心

解决问题:

  • 配置集中化:越来越多的运行时实例,使得散落的配置难于管理,配置不同步导致的问题十分严重。将配置集中于配置中心,可以更加有效进行管理。
  • 配置动态化:配置修改后的分发,是配置中心可以提供的另一个重要能力。它可支持数据源、表与分片及读写分离策略的动态切换。

配置中心数据结构:

配置中心在定义的命名空间的config下,以YAML格式存储,包括数据源,数据分片,读写分离、Properties配置,可通过修改节点来实现对于配置的动态管理。

  1. config
  2. ├──authentication # Sharding-Proxy权限配置
  3. ├──props # 属性配置
  4. ├──schema # Schema配置
  5. ├──sharding_db # SchemaName配置
  6. ├──datasource # 数据源配置
  7. ├──rule # 数据分片规则配置
  8. ├──masterslave_db # SchemaName配置
  9. ├──datasource # 数据源配置
  10. ├──rule # 读写分离规则

config/authentication

  1. password: root
  2. username: root

config/sharding/props

相对于sharding-sphere配置里面的Sharding Properties。

  1. executor.size: 20
  2. sql.show: true

config/schema/schemeName/datasource

多个数据库连接池的集合,不同数据库连接池属性自适配(例如:DBCP,C3P0,Druid, HikariCP)。

  1. ds_0: !!org.apache.shardingsphere.orchestration.yaml.YamlDataSourceConfiguration
  2. dataSourceClassName: com.zaxxer.hikari.HikariDataSource
  3. properties:
  4. url: jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false
  5. password: null
  6. maxPoolSize: 50
  7. maintenanceIntervalMilliseconds: 30000
  8. connectionTimeoutMilliseconds: 30000
  9. idleTimeoutMilliseconds: 60000
  10. minPoolSize: 1
  11. username: root
  12. maxLifetimeMilliseconds: 1800000
  13. ds_1: !!org.apache.shardingsphere.orchestration.yaml.YamlDataSourceConfiguration
  14. dataSourceClassName: com.zaxxer.hikari.HikariDataSource
  15. properties:
  16. url: jdbc:mysql://127.0.0.1:3306/demo_ds_1?serverTimezone=UTC&useSSL=false
  17. password: null
  18. maxPoolSize: 50
  19. maintenanceIntervalMilliseconds: 30000
  20. connectionTimeoutMilliseconds: 30000
  21. idleTimeoutMilliseconds: 60000
  22. minPoolSize: 1
  23. username: root
  24. maxLifetimeMilliseconds: 1800000

config/schema/sharding_db/rule

数据分片配置,包括数据分片 + 读写分离配置。

  1. tables:
  2. t_order:
  3. actualDataNodes: ds_$->{0..1}.t_order_$->{0..1}
  4. databaseStrategy:
  5. inline:
  6. shardingColumn: user_id
  7. algorithmExpression: ds_$->{user_id % 2}
  8. keyGenerator:
  9. column: order_id
  10. logicTable: t_order
  11. tableStrategy:
  12. inline:
  13. shardingColumn: order_id
  14. algorithmExpression: t_order_$->{order_id % 2}
  15. t_order_item:
  16. actualDataNodes: ds_$->{0..1}.t_order_item_$->{0..1}
  17. databaseStrategy:
  18. inline:
  19. shardingColumn: user_id
  20. algorithmExpression: ds_$->{user_id % 2}
  21. keyGenerator:
  22. column: order_item_id
  23. logicTable: t_order_item
  24. tableStrategy:
  25. inline:
  26. shardingColumn: order_id
  27. algorithmExpression: t_order_item_$->{order_id % 2}
  28. bindingTables:
  29. - t_order,t_order_item
  30. broadcastTables:
  31. - t_config
  32. defaultDataSourceName: ds_0
  33. masterSlaveRules: {}

config/schema/masterslave/rule

读写分离独立使用时使用该配置。

  1. name: ds_ms
  2. masterDataSourceName: ds_master
  3. slaveDataSourceNames:
  4. - ds_slave0
  5. - ds_slave1
  6. loadBalanceAlgorithmType: ROUND_ROBIN

动态生效:

在注册中心上修改、删除、新增相关配置,会动态推送到生产环境并立即生效。

10.2 注册中心

解决问题

  • 相对于配置中心管理配置数据,注册中心存放运行时的动态/临时状态数据,比如可用的proxy的实例,需要禁用或熔断的datasource实例。
  • 通过注册中心,可以提供熔断数据库访问程序对数据库的访问和禁用从库的访问的编排治理能力。治理仍然有大量未完成的功能(比如流控等)。

注册中心数据结构:

注册中心在定义的命名空间的state下,创建数据库访问对象运行节点,用于区分不同数据库访问实例。包括instances和datasources节点。

  1. instances
  2. ├──your_instance_ip_a@-@your_instance_pid_x
  3. ├──your_instance_ip_b@-@your_instance_pid_y
  4. ├──....
  5. datasources
  6. ├──ds0
  7. ├──ds1
  8. ├──....

Sharding-Proxy支持多逻辑数据源,因此datasources子节点的名称采用schema_name.data_source_name的形式。

  1. instances
  2. ├──your_instance_ip_a@-@your_instance_pid_x
  3. ├──your_instance_ip_b@-@your_instance_pid_y
  4. ├──....
  5. datasources
  6. ├──sharding_db.ds0
  7. ├──sharding_db.ds1
  8. ├──....

state/instances

数据库访问对象运行实例信息,子节点是当前运行实例的标识。 运行实例标识由运行服务器的IP地址和PID构成。运行实例标识均为临时节点,当实例上线时注册,下线时自动清理。 注册中心监控这些节点的变化来治理运行中实例对数据库的访问等。

state/datasources

可以治理读写分离从库,可动态添加删除以及禁用。

熔断实例:

可在IP地址@-@PID节点写入DISABLED(忽略大小写)表示禁用该实例,删除DISABLED表示启用。

Zookeeper命令如下:

  1. [zk: localhost:2181(CONNECTED) 0] set /your_zk_namespace/your_app_name/state/instances/your_instance_ip_a@-@your_instance_pid_x DISABLED

禁用从库:

在读写分离(或数据分片+读写分离)场景下,可在数据源名称子节点中写入DISABLED(忽略大小写)表示禁用从库数据源,删除DISABLED或节点表示启用。

Zookeeper命令如下:

  1. [zk: localhost:2181(CONNECTED) 0] set /your_zk_namespace/your_app_name/state/datasources/your_slave_datasource_name DISABLED

10.3支持的配置中心和注册中心

ShardingSphere在数据库治理模块使用SPI方式载入数据到配置中心/注册中心,进行实例熔断和 数据库禁用。 目前,ShardingSphere内部支持Zookeeper和Etcd这种常用的配置中心/注册中 心。 此外,您可以使用其他第三方配置中心/注册中心,例如Apollo、Nacos等,并通过SPI的方式 注入到ShardingSphere,从而使用该配置中心/注册中心,实现数据库治理功能。

看一下sharding-proxy源码怎么用的就可以。

使用的时候需要引入

这个是做版本控制的pom

  1. <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-orchestration-core -->
  2. <dependency>
  3. <groupId>org.apache.shardingsphere</groupId>
  4. <artifactId>sharding-orchestration-core</artifactId>
  5. <version>4.0.1</version>
  6. <type>pom</type>
  7. </dependency>

下面对应的版本好像还没有实现,最早有4.0.1

  1. <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-orchestration-reg-zookeeper-curator -->
  2. <dependency>
  3. <groupId>org.apache.shardingsphere</groupId>
  4. <artifactId>sharding-orchestration-reg-zookeeper-curator</artifactId>
  5. <version>4.0.1</version>
  6. </dependency>

配置中心

  1. <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-orchestration-core-configuration -->
  2. <dependency>
  3. <groupId>org.apache.shardingsphere</groupId>
  4. <artifactId>sharding-orchestration-core-configuration</artifactId>
  5. <version>4.1.1</version>
  6. </dependency>

注册中心

  1. <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-orchestration-core-registrycenter -->
  2. <dependency>
  3. <groupId>org.apache.shardingsphere</groupId>
  4. <artifactId>sharding-orchestration-core-registrycenter</artifactId>
  5. <version>4.1.0</version>
  6. </dependency>

10.4 性能监控

APM是应用性能监控的缩写。目前APM的主要功能着眼于分布式系统的性能诊断,其主要功能包括调用链展示,应用拓扑分析等

ShardingSphere并不负责如何采集、存储以及展示应用性能监控的相关数据,而是将SQL解析与SQL执行这两块数据分片的最核心的相关信息发送至应用性能监控系统,并交由其处理。 换句话说,ShardingSphere仅负责产生具有价值的数据,并通过标准协议递交至相关系统。ShardingSphere可以通过两种方式对接应用性能监控系统。

第一种方式是使用OpenTracing API发送性能追踪数据。面向OpenTracing协议的APM产品都可以和ShardingSphere自动对接,比如SkyWalking,Zipkin和Jaeger。使用这种方式只需要在启动时配置OpenTracing协议的实现者即可。 它的优点是可以兼容所有的与OpenTracing协议兼容的产品作为APM的展现系统,如果采用公司愿意实现自己的APM系统,也只需要实现OpenTracing协议,即可自动展示ShardingSphere的链路追踪信息。 缺点是OpenTracing协议发展并不稳定,较新的版本实现者较少,且协议本身过于中立,对于个性化的相关产品的实现不如原生支持强大。

第二种方式是使用SkyWalking的自动探针。 ShardingSphere团队与SkyWalking团队共同合作,在SkyWalking中实现了ShardingSphere自动探针,可以将相关的应用性能数据自动发送到SkyWalking中。

使用OpenTracing协议

  • 方法1:通过读取系统参数注入APM系统提供的Tracer实现类

启动时添加参数

  1. -Dorg.apache.shardingsphere.opentracing.tracer.class=org.apache.skywalking.apm.toolkit.opentracing.SkywalkingTracer

调用初始化方法

  1. ShardingTracer.init();
  • 方法2:通过参数注入APM系统提供的Tracer实现类
  1. ShardingTracer.init(new SkywalkingTracer());

注意:使用SkyWalking的OpenTracing探针时,应将原ShardingSphere探针插件禁用,以防止两种插件互相冲突

11 sharding-proxy

Sharding-Proxy是ShardingSphere的第二个产品,定位为透明化的数据库代理端,提供封装了数据库 二进制协议的服务端版本,用于完成对异构语言的支持。 目前先提供MySQL版本,它可以使用任何兼 容MySQL协议的访问客户端(如:MySQL Command Client, MySQL Workbench等操作数据,对DBA更加友好。

  • 向应用程序完全透明,可直接当做MySQL使用
  • 适用于任何兼容MySQL协议的客户端

Mysql下笔记 - 图35

10.1 使用

Proxy启动:

  1. 下载Sharding-Proxy的最新发行版。

  2. 如果使用docker,可以执行docker pull shardingsphere/sharding-proxy获取镜像。详细信息请参考Docker镜像

  3. 解压缩后修改conf/server.yaml和以config-前缀开头的文件,如:conf/config-xxx.yaml文件,进行分片规则、读写分离规则配置. 配置方式请参考配置手册

  4. Linux操作系统请运行bin/start.sh,Windows操作系统请运行bin/start.bat启动Sharding-Proxy。
    配置端口启动:${sharding-proxy}\bin\start.sh ${port}

  5. 使用任何PostgreSQL的客户端连接。如: psql -U root -h 127.0.0.1 -p 3307

注册中心使用:

若想使用Sharding-Proxy的数据库治理功能,则需要使用注册中心实现实例熔断和从库禁用功能。

Zookeeper

  1. Sharding-Proxy默认提供了Zookeeper的注册中心解决方案。

其他第三方注册中心:

  1. 将Sharding-Proxy的lib目录下的sharding-orchestration-reg-zookeeper-curator-${sharding-sphere.version}.jar文件删除。
  2. 使用SPI方式实现相关逻辑编码,并将生成的jar包放到Sharding-Proxy的lib目录下。
  3. 按照配置规则进行注册中心的配置,即可使用。

使用自定义分片算法:

当用户需要使用自定义的分片算法类时,无法再通过简单的inline表达式在yaml文件进行配置。可通过以下方式配置使用自定义分片算法。

  1. 实现ShardingAlgorithm接口定义的算法实现类。
  2. 将上述java文件打包成jar包。
  3. 将上述jar包拷贝至ShardingProxy解压后的conf/lib目录下。
  4. 将上述自定义算法实现类的java文件引用配置在yaml文件里tableRule的algorithmClassName属性上,具体可参考配置规则

分布式事务:

Sharding-Proxy接入的分布式事务API同Sharding-JDBC保持一致,支持LOCAL,XA,BASE类型的事务。

  • XA事务
    Sharding-Proxy原生支持XA事务,默认的事务管理器为Atomikos。 可以通过在Sharding-Proxy的conf目录中添加jta.properties来定制化Atomikos配置项。 具体的配置规则请参考Atomikos的官方文档

  • BASE事务
    BASE目前没有打包到Sharding-Proxy中,使用时需要将实现了ShardingTransactionManagerSPI的jar拷贝至conf/lib目录,然后切换事务类型为BASE。

SCTL (Sharding-Proxy control language):

SCTL为Sharding-Proxy特有的控制语句,可以在运行时修改和查询Sharding-Proxy的状态,目前支持的语法为:

语句 说明
sctl:set transaction_type=XX 修改当前TCP连接的事务类型, 支持LOCAL,XA,BASE。例:sctl:set transaction_type=XA
sctl:show transaction_type 查询当前TCP连接的事务类型
sctl:show cached_connections 查询当前TCP连接中缓存的物理数据库连接个数
sctl:explain SQL语句 查看逻辑SQL的执行计划,例:sctl:explain select * from t_order;
sctl:hint set MASTER_ONLY=true 针对当前TCP连接,是否将数据库操作强制路由到主库
sctl:hint set DatabaseShardingValue=yy 针对当前TCP连接,设置hint仅对数据库分片有效,并添加分片值,yy:数据库分片值
sctl:hint addDatabaseShardingValue xx=yy 针对当前TCP连接,为表xx添加分片值yy,xx:逻辑表名称,yy:数据库分片值
sctl:hint addTableShardingValue xx=yy 针对当前TCP连接,为表xx添加分片值yy,xx:逻辑表名称,yy:表分片值
sctl:hint clear 针对当前TCP连接,清除hint所有设置
sctl:hint show status 针对当前TCP连接,查询hint状态,master_only:true/false,sharding_type:databases_only/databases_tables
sctl:hint show table status 针对当前TCP连接,查询逻辑表的hint分片值

Sharding-Proxy 默认不支持hint,如需支持,请在conf/server.yaml中,将props的属性proxy.hint.enabled设置为true。在Sharding-Proxy中,HintShardingAlgorithm的泛型只能是String类型。

注意事项:

  1. Sharding-Proxy默认使用3307端口,可以通过启动脚本追加参数作为启动端口号。如: bin/start.sh 3308
  2. Sharding-Proxy使用conf/server.yaml配置注册中心、认证信息以及公用属性。
  3. Sharding-Proxy支持多逻辑数据源,每个以config-前缀命名的yaml配置文件,即为一个逻辑数据源。
  4. Sharding-Proxy 默认不支持hint,如需支持,请在conf/server.yaml中,将props的属性 proxy.hint.enabled设置为true。在Sharding-Proxy中,HintShardingAlgorithm的泛型只能是 String类型。

10.2 配置文件

数据源与分片配置示例

Sharding-Proxy支持多逻辑数据源,每个以config-前缀命名的yaml配置文件,即为一个逻辑数据源。以下是config-xxx.yaml的配置配置示例。

数据分片

dataSources:

  1. schemaName: sharding_db
  2. dataSources:
  3. ds0:
  4. url: jdbc:postgresql://localhost:5432/ds0
  5. username: root
  6. password:
  7. connectionTimeoutMilliseconds: 30000
  8. idleTimeoutMilliseconds: 60000
  9. maxLifetimeMilliseconds: 1800000
  10. maxPoolSize: 65
  11. ds1:
  12. url: jdbc:postgresql://localhost:5432/ds1
  13. username: root
  14. password:
  15. connectionTimeoutMilliseconds: 30000
  16. idleTimeoutMilliseconds: 60000
  17. maxLifetimeMilliseconds: 1800000
  18. maxPoolSize: 65
  19. shardingRule:
  20. tables:
  21. t_order:
  22. actualDataNodes: ds${0..1}.t_order${0..1}
  23. databaseStrategy:
  24. inline:
  25. shardingColumn: user_id
  26. algorithmExpression: ds${user_id % 2}
  27. tableStrategy:
  28. inline:
  29. shardingColumn: order_id
  30. algorithmExpression: t_order${order_id % 2}
  31. keyGenerator:
  32. type: SNOWFLAKE
  33. column: order_id
  34. t_order_item:
  35. actualDataNodes: ds${0..1}.t_order_item${0..1}
  36. databaseStrategy:
  37. inline:
  38. shardingColumn: user_id
  39. algorithmExpression: ds${user_id % 2}
  40. tableStrategy:
  41. inline:
  42. shardingColumn: order_id
  43. algorithmExpression: t_order_item${order_id % 2}
  44. keyGenerator:
  45. type: SNOWFLAKE
  46. column: order_item_id
  47. bindingTables:
  48. - t_order,t_order_item
  49. defaultTableStrategy:
  50. none:

读写分离

  1. schemaName: master_slave_db
  2. dataSources:
  3. ds_master:
  4. url: jdbc:postgresql://localhost:5432/ds_master
  5. username: root
  6. password:
  7. connectionTimeoutMilliseconds: 30000
  8. idleTimeoutMilliseconds: 60000
  9. maxLifetimeMilliseconds: 1800000
  10. maxPoolSize: 65
  11. ds_slave0:
  12. url: jdbc:postgresql://localhost:5432/ds_slave0
  13. username: root
  14. password:
  15. connectionTimeoutMilliseconds: 30000
  16. idleTimeoutMilliseconds: 60000
  17. maxLifetimeMilliseconds: 1800000
  18. maxPoolSize: 65
  19. ds_slave1:
  20. url: jdbc:postgresql://localhost:5432/ds_slave1
  21. username: root
  22. password:
  23. connectionTimeoutMilliseconds: 30000
  24. idleTimeoutMilliseconds: 60000
  25. maxLifetimeMilliseconds: 1800000
  26. maxPoolSize: 65
  27. masterSlaveRule:
  28. name: ds_ms
  29. masterDataSourceName: ds_master
  30. slaveDataSourceNames:
  31. - ds_slave0
  32. - ds_slave1

数据脱敏

  1. schemaName: encrypt_db
  2. dataSource:
  3. url: jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false
  4. username: root
  5. password:
  6. connectionTimeoutMilliseconds: 30000
  7. idleTimeoutMilliseconds: 60000
  8. maxLifetimeMilliseconds: 1800000
  9. maxPoolSize: 50
  10. encryptRule:
  11. encryptors:
  12. encryptor_aes:
  13. type: aes
  14. props:
  15. aes.key.value: 123456abc
  16. encryptor_md5:
  17. type: md5
  18. tables:
  19. t_encrypt:
  20. columns:
  21. user_id:
  22. plainColumn: user_plain
  23. cipherColumn: user_cipher
  24. encryptor: encryptor_aes
  25. order_id:
  26. cipherColumn: order_cipher
  27. encryptor: encryptor_md5

数据分片 + 读写分离

  1. schemaName: sharding_master_slave_db
  2. dataSources:
  3. ds0:
  4. url: jdbc:postgresql://localhost:5432/ds0
  5. username: root
  6. password:
  7. connectionTimeoutMilliseconds: 30000
  8. idleTimeoutMilliseconds: 60000
  9. maxLifetimeMilliseconds: 1800000
  10. maxPoolSize: 65
  11. ds0_slave0:
  12. url: jdbc:postgresql://localhost:5432/ds0_slave0
  13. username: root
  14. password:
  15. connectionTimeoutMilliseconds: 30000
  16. idleTimeoutMilliseconds: 60000
  17. maxLifetimeMilliseconds: 1800000
  18. maxPoolSize: 65
  19. ds0_slave1:
  20. url: jdbc:postgresql://localhost:5432/ds0_slave1
  21. username: root
  22. password:
  23. connectionTimeoutMilliseconds: 30000
  24. idleTimeoutMilliseconds: 60000
  25. maxLifetimeMilliseconds: 1800000
  26. maxPoolSize: 65
  27. ds1:
  28. url: jdbc:postgresql://localhost:5432/ds1
  29. username: root
  30. password:
  31. connectionTimeoutMilliseconds: 30000
  32. idleTimeoutMilliseconds: 60000
  33. maxLifetimeMilliseconds: 1800000
  34. maxPoolSize: 65
  35. ds1_slave0:
  36. url: jdbc:postgresql://localhost:5432/ds1_slave0
  37. username: root
  38. password:
  39. connectionTimeoutMilliseconds: 30000
  40. idleTimeoutMilliseconds: 60000
  41. maxLifetimeMilliseconds: 1800000
  42. maxPoolSize: 65
  43. ds1_slave1:
  44. url: jdbc:postgresql://localhost:5432/ds1_slave1
  45. username: root
  46. password:
  47. connectionTimeoutMilliseconds: 30000
  48. idleTimeoutMilliseconds: 60000
  49. maxLifetimeMilliseconds: 1800000
  50. maxPoolSize: 65
  51. shardingRule:
  52. tables:
  53. t_order:
  54. actualDataNodes: ms_ds${0..1}.t_order${0..1}
  55. databaseStrategy:
  56. inline:
  57. shardingColumn: user_id
  58. algorithmExpression: ms_ds${user_id % 2}
  59. tableStrategy:
  60. inline:
  61. shardingColumn: order_id
  62. algorithmExpression: t_order${order_id % 2}
  63. keyGenerator:
  64. type: SNOWFLAKE
  65. column: order_id
  66. t_order_item:
  67. actualDataNodes: ms_ds${0..1}.t_order_item${0..1}
  68. databaseStrategy:
  69. inline:
  70. shardingColumn: user_id
  71. algorithmExpression: ms_ds${user_id % 2}
  72. tableStrategy:
  73. inline:
  74. shardingColumn: order_id
  75. algorithmExpression: t_order_item${order_id % 2}
  76. keyGenerator:
  77. type: SNOWFLAKE
  78. column: order_item_id
  79. bindingTables:
  80. - t_order,t_order_item
  81. broadcastTables:
  82. - t_config
  83. defaultDataSourceName: ds0
  84. defaultTableStrategy:
  85. none:
  86. masterSlaveRules:
  87. ms_ds0:
  88. masterDataSourceName: ds0
  89. slaveDataSourceNames:
  90. - ds0_slave0
  91. - ds0_slave1
  92. loadBalanceAlgorithmType: ROUND_ROBIN
  93. ms_ds1:
  94. masterDataSourceName: ds1
  95. slaveDataSourceNames:
  96. - ds1_slave0
  97. - ds1_slave1
  98. loadBalanceAlgorithmType: ROUND_ROBIN

数据分片 + 数据脱敏

dataSources:

  1. schemaName: sharding_db
  2. dataSources:
  3. ds0:
  4. url: jdbc:postgresql://localhost:5432/ds0
  5. username: root
  6. password:
  7. connectionTimeoutMilliseconds: 30000
  8. idleTimeoutMilliseconds: 60000
  9. maxLifetimeMilliseconds: 1800000
  10. maxPoolSize: 65
  11. ds1:
  12. url: jdbc:postgresql://localhost:5432/ds1
  13. username: root
  14. password:
  15. connectionTimeoutMilliseconds: 30000
  16. idleTimeoutMilliseconds: 60000
  17. maxLifetimeMilliseconds: 1800000
  18. maxPoolSize: 65
  19. shardingRule:
  20. tables:
  21. t_order:
  22. actualDataNodes: ds${0..1}.t_order${0..1}
  23. databaseStrategy:
  24. inline:
  25. shardingColumn: user_id
  26. algorithmExpression: ds${user_id % 2}
  27. tableStrategy:
  28. inline:
  29. shardingColumn: order_id
  30. algorithmExpression: t_order${order_id % 2}
  31. keyGenerator:
  32. type: SNOWFLAKE
  33. column: order_id
  34. t_order_item:
  35. actualDataNodes: ds${0..1}.t_order_item${0..1}
  36. databaseStrategy:
  37. inline:
  38. shardingColumn: user_id
  39. algorithmExpression: ds${user_id % 2}
  40. tableStrategy:
  41. inline:
  42. shardingColumn: order_id
  43. algorithmExpression: t_order_item${order_id % 2}
  44. keyGenerator:
  45. type: SNOWFLAKE
  46. column: order_item_id
  47. bindingTables:
  48. - t_order,t_order_item
  49. defaultTableStrategy:
  50. none:
  51. encryptRule:
  52. encryptors:
  53. encryptor_aes:
  54. type: aes
  55. props:
  56. aes.key.value: 123456abc
  57. tables:
  58. t_order:
  59. columns:
  60. order_id:
  61. plainColumn: order_plain
  62. cipherColumn: order_cipher
  63. encryptor: encryptor_aes

全局配置示例

Sharding-Proxy使用conf/server.yaml配置注册中心、认证信息以及公用属性。

治理

治理模块目前支持配置中心和注册中心,具体配置为:

  • orchestrationType: config_center #配置配置中心
  • orchestrationType: registry_center #配置注册中心
  • orchestrationType: config_center,registry_center #同时配置配置中心和注册中心
  1. #省略数据分片和读写分离配置
  2. orchestration:
  3. orchestration_ds:
  4. orchestrationType: config_center,registry_center
  5. instanceType: zookeeper
  6. serverLists: localhost:2181
  7. namespace: orchestration
  8. props:
  9. overwrite: true

认证信息

  1. authentication:
  2. users:
  3. root:
  4. password: root
  5. sharding:
  6. password: sharding
  7. authorizedSchemas: sharding_db

公用属性

  1. props:
  2. executor.size: 16
  3. sql.show: false

数据源与分片配置项说明

数据分片

  1. schemaName: #逻辑数据源名称
  2. dataSources: #数据源配置,可配置多个data_source_name
  3. <data_source_name>: #与Sharding-JDBC配置不同,无需配置数据库连接池
  4. url: #数据库url连接
  5. username: #数据库用户名
  6. password: #数据库密码
  7. connectionTimeoutMilliseconds: 30000 #连接超时毫秒数
  8. idleTimeoutMilliseconds: 60000 #空闲连接回收超时毫秒数
  9. maxLifetimeMilliseconds: 1800000 #连接最大存活时间毫秒数
  10. maxPoolSize: 65 #最大连接数
  11. shardingRule: #省略数据分片配置,与Sharding-JDBC配置一致

读写分离

  1. schemaName: #逻辑数据源名称
  2. dataSources: #省略数据源配置,与数据分片一致
  3. masterSlaveRule: #省略读写分离配置,与Sharding-JDBC配置一致

数据脱敏

  1. dataSource: #省略数据源配置
  2. encryptRule:
  3. encryptors:
  4. <encryptor-name>:
  5. type: #加解密器类型,可自定义或选择内置类型:MD5/AES
  6. props: #属性配置, 注意:使用AES加密器,需要配置AES加密器的KEY属性:aes.key.value
  7. aes.key.value:
  8. tables:
  9. <table-name>:
  10. columns:
  11. <logic-column-name>:
  12. plainColumn: #存储明文的字段
  13. cipherColumn: #存储密文的字段
  14. assistedQueryColumn: #辅助查询字段,针对ShardingQueryAssistedEncryptor类型的加解密器进行辅助查询
  15. encryptor: #加密器名字
  16. props:
  17. query.with.cipher.column: true #是否使用密文列查询

全局配置项说明

治理与Sharding-JDBC配置一致。

Proxy属性

  1. #省略与Sharding-JDBC一致的配置属性
  2. props:
  3. acceptor.size: #用于设置接收客户端请求的工作线程个数,默认为CPU核数*2
  4. proxy.transaction.type: #默认为LOCAL事务,允许LOCAL,XA,BASE三个值,XA采用Atomikos作为事务管理器,BASE类型需要拷贝实现ShardingTransactionManager的接口的jar包至lib目录中
  5. proxy.opentracing.enabled: #是否开启链路追踪功能,默认为不开启。详情请参见[链路追踪](/cn/features/orchestration/apm/)
  6. check.table.metadata.enabled: #是否在启动时检查分表元数据一致性,默认值: false
  7. proxy.frontend.flush.threshold: # 对于单个大查询,每多少个网络包返回一次

权限验证

用于执行登录Sharding Proxy的权限验证。配置用户名、密码、可访问的数据库后,必须使用正确的用户名、密码才可登录Proxy。

  1. authentication:
  2. users:
  3. root: # 自定义用户名
  4. password: root # 自定义用户名
  5. sharding: # 自定义用户名
  6. password: sharding # 自定义用户名
  7. authorizedSchemas: sharding_db, masterslave_db # 该用户授权可访问的数据库,多个用逗号分隔。缺省将拥有root权限,可访问全部数据库。

10.3 5.0.0-alpha版本

下载最新的sharding-proxy变成 5.0.0-alpha了。配置发生了一些改变。具体可以 下载下来看给的配置文件实列。

需要注意以下问题:

  • 使用zk作为注册中心时,需要zk服务 3.x版本。
  • 配置zk作为配置中心时会启动报错。配置类为GovernanceSpringBootRootConfiguration 配置中心的属性为additionalConfigCenter。
  • 连接mysql可以看到所有的数据库。

注册中心中会保存有我们本地配置的路由规则,数据源等配置信息。

Mysql下笔记 - 图36

12 Mycat

12.1 简介

一个彻底开源的,面向企业应用开发的大数据库集群

支持事务、ACID、可以替代MySQL的加强版数据库

一个可以视为MySQL集群的企业级数据库,用来替代昂贵的Oracle集群

一个融合内存缓存技术、NoSQL技术、HDFS大数据的新型SQL Server

结合传统数据库和新型分布式数据仓库的新一代企业级数据库产品

一个新颖的数据库中间件产品

Mycat 是一个实现了 MySQL 协议的 Server,前端用户可以把它看作是一个数据库代理,用 MySQL 客 户端工具和命令行访问,而其后端可以用 MySQL 原生协议或JDBC 协议与多个 MySQL 服务器通信, 其核心功能是分库分表和读写分离,即将一个大表水平分割为 N 个小表,存储在后端 MySQL 服务器里 或者其他数据库里。

  • 对于 DBA 来说,可以这么理解 Mycat Mycat 就是 MySQL Server,但是Mycat 本身并不存储数据,数据是在后端的 MySQL 上存储的, 因此数据可靠性以及事务等都是 MySQL 保证的。简单的说,Mycat 就是 MySQL 最佳伴侣。

  • 对于软件工程师来说,可以这么理解 Mycat Mycat 就是一个近似等于 MySQL 的数据库服务器,你可以用连接 MySQL 的方式去连接 Mycat(除了端 口不同,默认的 Mycat 端口是 8066 而非 MySQL 的 3306,因此需要在连接字符 串上增加端口信息),大多数 情况下,可以用你熟悉的对象映射框架使用 Mycat,但建议对于分 片表,尽量使用基础的 SQL 语句,因为这样能 达到最佳性能,特别是几千万甚至几百亿条记录的 情况下。

  • 对于架构师来说,可以这么理解 Mycat Mycat 是一个强大的数据库中间件,不仅仅可以用作读写分离、以及分表分库、容灾备份,而且 可以用于多 用户应用开发、云平台基础设施、让你的架构具备很强的适应性和灵活性,借助于即 将发布的 Mycat 智能优化模块,系统的数据访问瓶颈和热点一目了然,根据这些统计分析数据, 你可以自动或手工调整后端存储,将不同的 表映射到不同存储引擎上,而整个应用的代码一行也 不用改变。

12.2 关键特性

  1. 支持SQL92标准
  2. 支持MySQL、Oracle、DB2、SQL Server、PostgreSQL等DB的常见SQL语法
  3. 遵守Mysql原生协议,跨语言,跨平台,跨数据库的通用中间件代理。
  4. 基于心跳的自动故障切换,支持读写分离,支持MySQL主从,以及galera cluster集群。
  5. 支持Galera for MySQL集群,Percona Cluster或者MariaDB cluster
  6. 基于Nio实现,有效管理线程,解决高并发问题。
  7. 支持数据的多片自动路由与聚合,支持sum,count,max等常用的聚合函数,支持跨库分页。
  8. 支持单库内部任意join,支持跨库2表join,甚至基于caltlet的多表join。
  9. 支持通过全局表,ER关系的分片策略,实现了高效的多表join查询。
  10. 支持多租户方案。
  11. 支持分布式事务(弱xa)。
  12. 支持XA分布式事务(1.6.5)。
  13. 支持全局序列号,解决分布式下的主键生成问题。
  14. 分片规则丰富,插件化开发,易于扩展。
  15. 强大的web,命令行监控。
  16. 支持前端作为MySQL通用代理,后端JDBC方式支持Oracle、DB2、SQL Server 、 mongodb 、巨杉。
  17. 支持密码加密
  18. 支持服务降级
  19. 支持IP白名单
  20. 支持SQL黑名单、sql注入攻击拦截
  21. 支持prepare预编译指令(1.6)
  22. 支持非堆内存(Direct Memory)聚合计算(1.6)
  23. 支持PostgreSQL的native协议(1.6)
  24. 支持mysql和oracle存储过程,out参数、多结果集返回(1.6)
  25. 支持zookeeper协调主从切换、zk序列、配置zk化(1.6)
  26. 支持库内分表(1.6)
  27. 集群基于ZooKeeper管理,在线升级,扩容,智能优化,大数据处理(2.0开发版)。

12.3 核心概念

  • 逻辑库
    对数据进行分片处理之后,从原有的一个库,被切分为多个分片数据库,所有的分片数据库集群构成了 整个完整的数据库存储。Mycat在操作时,使用逻辑库来代表这个完整的数据库集群,便于对整个集群 操作。

  • 逻辑表
    既然有逻辑库,那么就会有逻辑表,分布式数据库中,对应用来说,读写数据的表就是逻辑表。

  • 分片表 分片表,是指那些原有的很大数据的表,需要切分到多个数据库的表,这样,每个分片都有一部分数 据,所 有分片构成了完整的数据。例如在 mycat 配置中的 t_node 就属于分片表,数据按照规则被分 到 dn1,dn2 两个分片节点上。

    1. <table name="t_node" primaryKey="vid" autoIncrement="true" dataNode="dn1,dn2"
    2. rule="rule1" />
  • 非分片表
    一个数据库中并不是所有的表都很大,某些表是可以不用进行切分的,非分片是相对分片表来说的,就是那些不需要进行数据切分的表。如下配置中 t_node,只存在于分片节点dn1上。
    1. <table name="t_node" primaryKey="vid" autoIncrement="true" dataNode="dn1" />
  • ER表
    Mycat提出了基于 E-R 关系的数据分片策略,子表的记录与所关联的父表记录存放在同一个数据分片 上,即子表依赖于父表,通过表分组(Table Group)保证数据 join 不会跨库操作。表分组(Table Group) 是解决跨分片数据 join 的一种很好的思路,也是数据切分规划的重要一条规则。

  • 全局表
    一个真实的业务系统中,往往存在大量的类似字典表的表,这些表基本上很少变动,字典表具有以下几 个特 性:

    • 变动不频繁;
    • 数据量总体变化不大;
    • 数据规模不大,很少有超过数十万条记录。
  • 分片节点
    数据切分后,一个大表被分到不同的分片数据库上面,每个表分片所在的数据库就是分片节点 dataNode。

  • 节点主机
    数据切分后,每个分片节点不一定都会独占一台机器,同一机器上面可以有多个分片数据库, 这样一个 或多个分片节点所在的机器就是节点主机,为了规避单节点主机并发数限制, 尽量将读写压力高的分片 节点均衡的放在不同的节点主机dataHost。

  • 分片规则
    前面讲了数据切分,一个大表被分成若干个分片表,就需要一定的规则rule,这样按照某种业务规则把数据分到某个分片的规则就是分片规则,数据切分选择合适的分片规则非常重要,将极大的避免后续数据处理的难度。

12.4 配置文件

官方文档:【http://www.mycat.org.cn/document/mycat-definitive-guide.pdf】

server.xml

server.xml几乎保存了所有 mycat 需要的系统配置信息。

user标签

这个标签主要用于定义登录 mycat 的用户和权限。例如下面的例子中,我们定义了一个用户,用户名 为 user、密码也为 user,可访问的 schema为order。

  1. <user name="root">
  2. <property name="password">root</property>
  3. <!--逻辑库-->
  4. <property name="schemas">order</property>
  5. <property name="readOnly">true</property>
  6. <property name="defaultSchema">order</property>
  7. </user>

firewall标签

  1. <firewall>
  2. <!-- ip白名单 用户对应的可以访问的 ip 地址 -->
  3. <whitehost>
  4. <host host="127.0.0.*" user="root"/>
  5. <host host="127.0.*" user="root"/>
  6. <host host="127.*" user="root"/>
  7. <host host="1*7.*" user="root"/>
  8. </whitehost>
  9. <!-- 黑名单允许的 权限 后面为默认 还有更多的权限配置可以参考官方文档-->
  10. <blacklist check="true">
  11. <property name="selelctAllow">false</property>
  12. <property name="selelctIntoAllow">false</property>
  13. <property name="updateAllow">false</property>
  14. <property name="insertAllow">false</property>
  15. <property name="deletetAllow">false</property>
  16. <property name="dropAllow">false</property>
  17. <property name="truncateAllow">false</property>
  18. <property name="createTableAllow">false</property>
  19. <property name="dropTableAllow">false</property>
  20. </blacklist>
  21. </firewall>

永真条件 也是在blacklist配置的

  1. selectWhereAlwayTrueCheck true 检查 SELECT 语句的 WHERE 子句是否是一个永真条件
  2. selectHavingAlwayTrueCheck true 检查 SELECT 语句的 HAVING 子句是否是一个永真条件
  3. deleteWhereAlwayTrueCheck true 检查 DELETE 语句的 WHERE 子句是否是一个永真条件
  4. deleteWhereNoneCheck false 检查 DELETE 语句是否无 where 条件,这是有风险的,但不是 SQL 注入类型的风险
  5. updateWhereAlayTrueCheck true 检查 UPDATE 语句的 WHERE 子句是否是一个永真条件
  6. updateWhereNoneCheck false 检查 UPDATE 语句是否无 where 条件,这是有风险的,但不是SQL 注入类型的风险
  7. conditionAndAlwayTrueAllow false 检查查询条件(WHERE/HAVING 子句)中是否包含 AND 永真条件
  8. conditionAndAlwayFalseAllow false 检查查询条件(WHERE/HAVING 子句)中是否包含 AND 永假条件
  9. conditionLikeTrueAllow true 检查查询条件(WHERE/HAVING 子句)中是否包含 LIKE 永真条件

全局序列号

在实现分库分表的情况下,数据库自增主键已无法保证自增主键的全局唯一。为此,Mycat 提供了全局 sequence,并且提供了包含本地配置和数据库配置等多种实现方式。

  1. <system>
  2. <property name="sequnceHandlerType">0</property>
  3. </system>

0表示使用本地文件方式;1表示使用数据库方式生成;2表示使用本地时间戳方式;3表示基于ZK与本地配置的分布式ID生成器;4表示使用zookeeper递增方式生成

  • 0 本地文件
    此方式 Mycat 将 sequence 配置到文件中,当使用到 sequence 中的配置后,Mycat 会在classpath 中的 sequence_conf.properties 文件中 sequence 当前的值。
    1. #default global sequence
    2. GLOBAL.HISIDS=
    3. GLOBAL.MINID=10001
    4. GLOBAL.MAXID=20000
    5. GLOBAL.CURID=10000
    6. # self define sequence
    7. COMPANY.HISIDS=
    8. COMPANY.MINID=1001
    9. COMPANY.MAXID=2000
    10. COMPANY.CURID=1000
    11. ORDER.HISIDS=
    12. ORDER.MINID=1001
    13. ORDER.MAXID=2000
    14. ORDER.CURID=1000
  • 1 数据库方式
    在数据库中建立一张表,存放 sequence 名称(name),sequence 当前值(current_value),步长 (increment) 等信息。
    需要在sequence_db_conf.properties进行配置。然后执行dbseq.sql脚本文件
    1. CREATE TABLE MYCAT_SEQUENCE
    2. (
    3. name VARCHAR(64) NOT NULL,
    4. current_value BIGINT(20) NOT NULL,
    5. increment INT NOT NULL DEFAULT 1,
    6. PRIMARY KEY (name)
    7. ) ENGINE = InnoDB;
  • 2 本地时间戳方式 雪花算法
    ID为64 位二进制 ,42(毫秒)+5(机器 ID)+5(业务编码)+12(重复累加) 换算成十进制为 18 位数的 long 类型,每毫秒可以并发 12 位二进制的累加。 在 Mycat 下配置sequence_time_conf.properties文件
    1. WORKID=0-31 任意整数
    2. DATAACENTERID=0-31 任意整数


每个Mycat 配置的 WORKID、DATAACENTERID 不同,组成唯一标识,总共支持32*32=1024 种组合。

  • 3 分布式 ZK ID 生成器
    Zk 的连接信息统一在 myid.properties 的 zkURL 属性中配置。基于 ZK 与本地配置的分布式 ID 生成 器,InstanceID可以通过ZK自动获取,也可以通过配置文件配置。在 sequence_distributed_conf.properties,只要配置INSTANCEID=ZK就表示从 ZK 上获取 InstanceID。
    ID 最大为63位二进制,可以承受单机房单机器单线程 1000*(2^6)=640000 的并发。结构如下:
    current time millis(微秒时间戳 38 位,可以使用 17 年)
    clusterId(机房或者 ZKid,通过配置文件配置,5 位)
    instanceId(实例 ID,可以通过 ZK 或者配置文件获取,5 位)
    threadId(线程 ID,9 位)
    increment(自增,6 位)

  • 4 ZK 递增方式
    Zk 的连接信息统一在 myid.properties 的 zkURL 属性中配置。需要配置sequence_conf.properties文件
    TABLE.MINID 某线程当前区间内最小值
    TABLE.MAXID 某线程当前区间内最大值
    TABLE.CURID 某线程当前区间内当前值

bindIp : mycat 服务监听的 IP 地址,默认值为 0.0.0.0。

serverPort : 定义 mycat 的使用端口,默认值为 8066。

managerPort : 定义 mycat 的管理端口,默认值为 9066。

processors: 这个属性主要用于指定系统可用的线程数,默认值为机器 CPU 核心线程数。 主要影响 processorBufferPool、processorBufferLocalPercent、processorExecutor 属性。 NIOProcessor 的个数也是由这个属性定义的,所以调优的时候可以适当的调高这个属性。

processorBufferChunk: 这个属性指定每次分配 Socket Direct Buffer 的大小,默认是 4096 个字节。这个属性也影响 buffer pool 的 长度。如果一次性获取的数过大 buffer 不够用 经常出现警告,则可以适当调大。

processorBufferPool:这个属性指定 bufferPool 计算 比例值。由于每次执行 NIO 读、写操作都需要使用到 buffer,系统初始化的 时候会建立一定长度的 buffer 池来加快读、写的效率,减少建立 buffer 的时间。 Mycat 中有两个主要的 buffer 池:

  • BufferPool

  • ThreadLocalPool BufferPool 由 ThreadLocalPool 组合而成,每次从 BufferPool 中获取 buffer 都会优先获取 ThreadLocalPool 中的 buffer,未命中之后才会去获取 BufferPool 中的 buffer。也就是说ThreadLocalPool 是 作为 BufferPool 的二级缓存,每个线程内部自己使用的。当然,这其中还有一些限制条件需要线程的名字是由$ 开头。然而,BufferPool 上的 buffer 则是每个 NIOProcessor 都共享的。
    默认这个属性的值为: 默认 bufferChunkSize(4096)
    processors 属性 1000
    BufferPool 的总长度 = bufferPool / bufferChunk。
    若 bufferPool 不是 bufferChunk 的整数倍,则总长度为前面计算得出的商 + 1 假设系统线程数为 4,其他都为属性的默认值,则:
    bufferPool = 4096
    4 _ 1000
    BufferPool 的总长度 : 4000 = 16384000 / 4096。

processorBufferLocalPercent: 前面提到了 ThreadLocalPool。这个属性就是用来控制分配这个 pool 的大小用的,但其也并不是一个准确 的值,也是一个比例值。这个属性默认值为 100。

线程缓存百分比 = bufferLocalPercent / processors 属性。 例如,系统可以同时运行 4 个线程,使用默认值,则根据公式每个线程的百分比为 25。最后根据这个百分比 来计算出具体的 ThreadLocalPool 的长度公式如下:

ThreadLocalPool 的长度 = 线程缓存百分比 * BufferPool 长度 / 100

假设 BufferPool 的长度为 4000,其他保持默认值。

那么最后每个线程建立上的 ThreadLocalPool 的长度为: 1000 = 25 * 4000 / 100

processorExecutor:这个属性主要用于指定 NIOProcessor 上共享的 businessExecutor 固定线程池大小。mycat 在需要处理一 些异步逻辑的时候会把任务提交到这个线程池中。新版本中这个连接池的使用频率不是很大了,可以设置一个较 小的值

schema.xml配置

管理着 Mycat 的逻辑库、表、分片节点、主机等信 息。

以下是shema的示例配置文件

  1. <?xml version="1.0"?>
  2. <!DOCTYPE mycat:schema SYSTEM "schema.dtd">
  3. <mycat:schema xmlns:mycat="http://io.mycat/">
  4. <schema name="TESTDB" checkSQLschema="true" sqlMaxLimit="100" randomDataNode="dn1">
  5. <!-- auto sharding by id (long) -->
  6. <!--splitTableNames 启用<table name 属性使用逗号分割配置多个表,即多个表使用这个配置-->
  7. <!--fetchStoreNodeByJdbc 启用ER表使用JDBC方式获取DataNode-->
  8. <table name="customer" primaryKey="id" dataNode="dn1,dn2" rule="sharding-by-intfile" autoIncrement="true" fetchStoreNodeByJdbc="true">
  9. <childTable name="customer_addr" primaryKey="id" joinKey="customer_id" parentKey="id"> </childTable>
  10. </table>
  11. <!-- <table name="oc_call" primaryKey="ID" dataNode="dn1$0-743" rule="latest-month-calldate"
  12. /> -->
  13. </schema>
  14. <!-- <dataNode name="dn1$0-743" dataHost="localhost1" database="db$0-743"
  15. /> -->
  16. <dataNode name="dn1" dataHost="localhost1" database="db1" />
  17. <dataNode name="dn2" dataHost="localhost1" database="db2" />
  18. <dataNode name="dn3" dataHost="localhost1" database="db3" />
  19. <!--<dataNode name="dn4" dataHost="sequoiadb1" database="SAMPLE" />
  20. <dataNode name="jdbc_dn1" dataHost="jdbchost" database="db1" />
  21. <dataNode name="jdbc_dn2" dataHost="jdbchost" database="db2" />
  22. <dataNode name="jdbc_dn3" dataHost="jdbchost" database="db3" /> -->
  23. <dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
  24. writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
  25. <heartbeat>select user()</heartbeat>
  26. <!-- can have multi write hosts -->
  27. <writeHost host="hostM1" url="jdbc:mysql://localhost:3306" user="root"
  28. password="root">
  29. </writeHost>
  30. <!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
  31. </dataHost>
  32. <!--
  33. <dataHost name="sequoiadb1" maxCon="1000" minCon="1" balance="0" dbType="sequoiadb" dbDriver="jdbc">
  34. <heartbeat> </heartbeat>
  35. <writeHost host="hostM1" url="sequoiadb://1426587161.dbaas.sequoialab.net:11920/SAMPLE" user="jifeng" password="jifeng"></writeHost>
  36. </dataHost>
  37. <dataHost name="oracle1" maxCon="1000" minCon="1" balance="0" writeType="0" dbType="oracle" dbDriver="jdbc"> <heartbeat>select 1 from dual</heartbeat>
  38. <connectionInitSql>alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss'</connectionInitSql>
  39. <writeHost host="hostM1" url="jdbc:oracle:thin:@127.0.0.1:1521:nange" user="base" password="123456" > </writeHost> </dataHost>
  40. <dataHost name="jdbchost" maxCon="1000" minCon="1" balance="0" writeType="0" dbType="mongodb" dbDriver="jdbc">
  41. <heartbeat>select user()</heartbeat>
  42. <writeHost host="hostM" url="mongodb://192.168.0.99/test" user="admin" password="123456" ></writeHost> </dataHost>
  43. <dataHost name="sparksql" maxCon="1000" minCon="1" balance="0" dbType="spark" dbDriver="jdbc">
  44. <heartbeat> </heartbeat>
  45. <writeHost host="hostM1" url="jdbc:hive2://feng01:10000" user="jifeng" password="jifeng"></writeHost> </dataHost> -->
  46. <!-- <dataHost name="jdbchost" maxCon="1000" minCon="10" balance="0" dbType="mysql"
  47. dbDriver="jdbc"> <heartbeat>select user()</heartbeat> <writeHost host="hostM1"
  48. url="jdbc:mysql://localhost:3306" user="root" password="123456"> </writeHost>
  49. </dataHost> -->
  50. </mycat:schema>

schema标签

schema 标签用于定义 Mycat 实例中的逻辑库,Mycat 可以有多个逻辑库,每个逻辑库都有自己的相关 配 置。可以使用 schema 标签来划分这些不同的逻辑库。

  1. <!-- 逻辑库 -->
  2. <schema name="order" checkSQLschema="true" sqlMaxLimit="100" dataNode="dn1"></schema>

dataNode:指定分片节点 对应dataNode的name属性。该属性用于绑定逻辑库到某个具体的 database 上,1.3 版本如果配置了 dataNode,则不可以配置分片表, 1.4 可以配置默认分片,只需要配置需要分片的表即可,具体如下配置。

  • 1.3配置
    1. <schema name="USERDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn1">
    2. <!—里面不能配置任何表-->
    3. </schema>
  • 1.4 以上配置
    1. <schema name="USERDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn2">
    2. <!—配置需要分片的表-->
    3. <table name=“tuser” dataNode=”dn1”/>
    4. </schema>


那么现在 tuser 就绑定到 dn1 所配置的具体 database 上,可以直接访问这个 database,没有配置的表则会走 默认节点 dn2,这里注意没有配置在分片里面的表工具查看无法显示,但是可以正常使用。

name:逻辑库名字

checkSQLschema:当该值设置为 true 时,如果我们执行语句select from order.travelrecord;则 MyCat 会把语句修改 为select from travelrecord;。即把表示 schema 的字符去掉,避免发送到后端数据库执行时报(ERROR 1146 (42S02): Table ‘order.travelrecord’ doesn’t exist)。

sqlMaxLimit:如果我们执行sql语句的时候没有加limit,mycat会自动加入limit。

  1. 需要注意的是,如果运行的 schema 为非拆分库的,那么该属性不会生效。需要手动添加 limit 语句。

table标签

逻辑表,需要拆分的表都需要再这里定义

  1. <table name="b_order" dataNode="dn1,dn2" rule="b_order_rule" primaryKey="ID"
  2. autoIncrement="true"/>

name:逻辑表名字,每一个schema标签下定义的名字必须唯一

dataNode:定义这个逻辑表所属的 dataNode, 该属性的值需要和 dataNode 标签中 name 属性的值相互对应。如果需 要定义的 dn 过多 可以使用如下的方法减少配置:

  1. <table name="travelrecord" dataNode="multipleDn$0-99,multipleDn2$100-199" rule="auto-shardinglong" ></table>
  2. <dataNode name="multipleDn$0-99" dataHost="localhost1" database="db$0-99" ></dataNode>
  3. <dataNode name="multipleDn2$100-199" dataHost="localhost1" database="db$100-199" ></dataNode>

这里需要注意的是 database 属性所指定的真实 database name 需要在后面添加一个,例如上面的例子中, 我需要在真实的 mysql 上建立名称为 db0 到 db99 的 database。

rule:该属性用于指定逻辑表要使用的规则名字,规则名字在 rule.xml 中定义,必须与 tableRule 标签中 name 属 性属性值一一对应。

ruleRequired:该属性用于指定表是否绑定分片规则,如果配置为 true,但没有配置具体 rule 的话 ,程序会报错。默认为false。

primaryKey:该逻辑表对应真实表的主键,例如:分片的规则是使用非主键进行分片的,那么在使用主键查询的时候,就 会发送查询语句到所有配置的 DN 上,如果使用该属性配置真实表的主键。那么 MyCat 会缓存主键与具体 DN 的 信息,那么再次使用主键进行查询的时候就不会进行广播式的查询,就会直接发送语句给具体的 DN,但是尽管配置该属性,如果缓存并没有命中的话,还是会发送语句给所有的 DN,来获得数据。【https://blog.csdn.net/kimva/article/details/78951887】

type:该属性定义了逻辑表的类型,目前逻辑表只有“全局表”和”普通表”两种类型。如果是全局表指定值为“global”。插入的时候会往指定的各个dataNode插入数据,但是查询的时候不会进行合并出现重复数据。

autoIncrement:mysql 对非自增长主键,使用 last_insert_id()是不会返回结果的,只会返回 0。所以,只有定义了自增长主 键的表才可以用 last_insert_id()返回主键值。 mycat 目前提供了自增长主键功能,但是如果对应的 mysql 节点上数据表,没有定义 auto_increment,那 么在 mycat 层调用 last_insert_id()也是不会返回结果的。 由于 insert 操作的时候没有带入分片键,mycat 会先取下这个表对应的全局序列,然后赋值给分片键。这样 才能正常的插入到数据库中,最后使用 last_insert_id()才会返回插入的分片键值。 如果要使用这个功能最好配合使用数据库模式的全局序列。 使用 autoIncrement=“true” 指定这个表有使用自增长主键,这样 mycat 才会不抛出分片键找不到的异常。默认为false。

subtables: 使用方式添加 subTables=”t_order$1-2,t_order3”。

目前分表 1.6 以后开始支持 并且 dataNode 在分表条件下只能配置一个,分表条件下不支持各种条件的 join 语句。

needAddLimit:指定表是否需要自动的在每个语句后面加上 limit 限制。由于使用了分库分表,数据量有时会特别巨大。这时 候执行查询语句,如果恰巧又忘记了加上数量限制的话。那么查询所有的数据出来,也够等上一小会儿的。 所以,mycat 就自动的为我们加上 LIMIT 100。当然,如果语句中有 limit,就不会在次添加了。 这个属性默认为 true,你也可以设置成 false`禁用掉默认行为。

dataNode标签

定义数据节点

  1. <!-- 数据节点 -->
  2. <dataNode name="dn1" dataHost="order_1" database="order1" />

name: 定义数据节点的名字,这个名字需要是唯一的,我们需要在 table 标签上应用这个名字,来建 立表与分片对应的关系。

dataHost : 用于定义该分片属于哪个分片主机,属性值是引用 dataHost 标签上定义的 name 属性。

database: 用于定义该分片节点属于哪个具体的库。为实际存在的物理库名。

dataHost标签

dataHost标签在 Mycat 逻辑库中也是作为最底层的标签存在,直接定义了具体的数据库实例、读写分 离配置和心跳语句。

  1. <dataHost name="order_1" maxCon="100" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="native" switchType="1"
  2. slaveThreshold="100">
  3. </dataHost>

name:dataHost的唯一标识,给dataNode使用。

maxCon:指定每个读写实例连接池的最大连接。也就是说,标签内嵌套的 writeHost、readHost 标签都会使用这个属 性的值来实例化出连接池的最大连接数。

minCon:指定每个读写实例连接池的最小连接,初始化连接池的大小。

balance:

  • 0 不开启读写分离机制,所有读操作都发送到当前可用的 writeHost 上。
  • 1 全部的 readHost 与 stand by writeHost (备用的写库)参与 select 语句的负载均衡,简单的说,当双主双从模式(M1->S1,M2->S2,并且 M1 与 M2 互为主备),正常情况下,M2,S1,S2 都参与 select 语句的负载均衡。
  • 2 所有读操作都随机的在 writeHost、readhost 上分发。
  • 3 所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力。balance=3 只在 1.4 及其以后版本有,1.3 没有。

writeType:

  • -1 不自动切换

  • 0 所有写操作发送到配置的第一个 writeHost,第一个挂了切到还生存的第二个 writeHost, 重新启动后已切换后的为准,切换记录在配置文件中:dnindex.properties。

  • 1 所有写操作都随机的发送到配置的 writeHost,1.5 以后废弃不推荐。使用switchType 属性。

  • 2 基于 MySQL 主从同步的状态决定是否切换。

dbType:指定后端连接的数据库类型,目前支持二进制的 mysql 协议,还有其他使用 JDBC 连接的数据库。例如: mongodb、oracle、spark 等。

dbDriver:

指定连接后端数据库使用的 Driver,目前可选的值有 native 和 JDBC。使用 native 的话,因为这个值执行的 是二进制的 mysql 协议,所以可以使用 mysql 和 maridb。其他类型的数据库则需要使用 JDBC 驱动来支持。

从 1.6 版本开始支持 postgresql 的 native 原始协议。

如果使用 JDBC 的话需要将符合 JDBC 4 标准的驱动 JAR 包放到 MYCAT\lib 目录下,并检查驱动 JAR 包中 包括如下目录结构的文件:META-INF\services\java.sql.Driver。在这个文件内写上具体的 Driver 类名,例如: com.mysql.jdbc.Driver

switchType:

  • -1 不自动切换
  • 1 默认值,自动切换
  • 2 基于 MySQL 主从同步的状态决定是否切换,心跳语句为 show slave status

    • 1.4 开始支持 MySQL 主从复制状态绑定的读写分离机制,让读更加安全可靠,配置如下: MyCAT 心跳检查语句配置为 show slave status ,dataHost 上定义两个新属性: switchType=”2” 与 slaveThreshold=”100”,此时意味着开启 MySQL 主从复制状态绑定的读写分离与切换机制,Mycat 心 跳机 制通过检测 show slave status 中的 “Seconds_Behind_Master”, “Slave_IO_Running”, “Slave_SQL_Running” 三个字段来确定当前主从同步的状态以及 Seconds_Behind_Master 主从复制时 延, 当 Seconds_Behind_Master > slaveThreshold 时,读写分离筛选器会过滤掉此 Slave 机器,防止 读到很久之 前的旧数据,而当主节点宕机后,切换逻辑会检查 Slave 上的 Seconds_Behind_Master 是 否为 0,为 0 时则 表示主从同步,可以安全切换,否则不会切换。
  • 3 基于 MySQL galery cluster 的切换机制(适合集群)心跳语句为 show status like ‘wsrep%’

    • 1.4.1 开始支持 MySQL 集群模式,让读更加安全可靠,配置如下: MyCAT 心跳检查语句配置为 show status like ‘wsrep%’ ,dataHost 上定义两个新属性: switchType=”3” 此时意味着开启 MySQL 集群复制状态状态绑定的读写分离与切换机制,Mycat 心跳机制通过检测集群复制时延时,如果延时过大或者集群出现节点问题不会负载改节点。

heartbeat标签

这个标签内指明用于和后端数据库进行心跳检查的语句。例如,MYSQL 可以使用 select user(),Oracle 可以 使用 select 1 from dual 等。

这个标签还有一个 connectionInitSql 属性,主要是当使用 Oracla 数据库时,需要执行的初始化 SQL 语句就 这个放到这里面来。例如:alter session set nls_date_format=’yyyy-mm-dd hh24:mi:ss’

主从切换的语句必须是:show slave status

writeHost,readHost标签

这两个标签都指定后端数据库的相关配置给 mycat,用于实例化后端连接池。唯一不同的是,writeHost 指 定写实例、readHost 指定读实例,组着这些读写实例来满足系统的要求。

在一个 dataHost 内可以定义多个 writeHost 和 readHost。但是,如果 writeHost 指定的后端数据库宕机, 那么这个 writeHost 绑定的所有 readHost 都将不可用。另一方面,由于这个 writeHost 宕机系统会自动的检测 到,并切换到备用的 writeHost 上去。注意配置多个writeHost时,应该是主从互备的双master,或者时mysql集群,或者是一个主从结构。不然当一个writeHost挂了,切换到另一个之后就没有数据 了,还不如切换到从库。

  1. <dataHost name="order_2" maxCon="100" minCon="10" balance="0"
  2. writeType="0" dbType="mysql" dbDriver="native" switchType="1"
  3. slaveThreshold="100">
  4. <heartbeat>select user()</heartbeat>
  5. <writeHost host="M1" url="127.0.0.1:3306" user="root" password="1234">
  6. <!-- <readHost /> -->
  7. </writeHost>
  8. </dataHost>

host:主机名 writeHost M1 readHost S1

url:后端实例连接地址,如果是使用 native 的 dbDriver,则一般为 address:port 这种形式。用 JDBC 或其他的 dbDriver,则需要特殊指定。当使用 JDBC 时则可以这么写:jdbc:mysql://localhost:3306/。

user:用户名

password:密码

weight:权重配置在 readhost 中作为读节点的权重(1.4 以后)。

usingDecrypt:是否对密码加密默认 0 否 如需要开启配置 1。通常不开启,使用麻烦。

读写分离配置:

  1. <dataNode name="dn3" dataHost="localhost3" dataBase="city"/><!--库名是一样的-->
  2. <dataHost name="localhost3" maxCon="100" minCon="10" balance="0"
  3. writeType="0" dbType="mysql" dbDriver="native" switchType="1"
  4. slaveThreshold="100">
  5. <heartbeat>select user()</heartbeat>
  6. <writeHost host="M1" url="127.0.0.1:3306" user="root" password="1234">
  7. <readHost host="S1" url="127.0.0.1:307" user="root" password="root"/>
  8. </writeHost>
  9. </dataHost>

或者

  1. <dataHost name="localhost1" maxCon="1000" minCon="10" balance="1" writeType="0"
  2. dbType="mysql" dbDriver="native">
  3. <heartbeat>select user()</heartbeat>
  4. <!-- can have multi write hosts -->
  5. <writeHost host="M1" url="localhost:3306" user="root" password="root">
  6. </writeHost>
  7. <writeHost host="S1" url="localhost:3307" user="root" password="root">
  8. </writeHost>
  9. </dataHost>

以上两种取模第一种当写挂了读不可用,第二种可以继续使用,事务内部的一切操作都会走写节点,所 以读操作不要加事务,如果读延时较大,使用根据主从延时切换的读写分离,或者强制走写节点。

readHost是必须嵌套在readHost里面的。

如果有一个主库有多个从库,可以将一个从库放在外面使用writeHost。其他嵌套在主库的writeHost里面。

强制路由

一个查询 SQL 语句以/ !mycat /注解来确定其是走读节点还是写节点。

  1. 强制走从:
  2. /*!mycat:db_type=slave*/ select * from travelrecord //有效
  3. /*#mycat:db_type=slave*/ select * from travelrecord
  4. 强制走写:
  5. /*!mycat:db_type=master*/ select * from travelrecord //有效
  6. /*#mycat:db_type=slave*/ select * from travelrecord
  1. /*!mycat:sql=sql */ 指定真正执行的SQL
  2. /*!mycat:schema=schema1 */ 指定走那个schema
  3. /*!mycat:datanode=dn1 */ 指定sql要运行的节点
  4. /*!mycat:catl

rule.xml配置

用于定义分片规则

tableRule标签

  1. <tableRule name="c_order_rule">
  2. <rule>
  3. <columns>user_id</columns>
  4. <algorithm>partitionByOrderFunc</algorithm>
  5. </rule>
  6. </tableRule>

name:指定唯一的名字,用于标识不同的表规则。 schema.xml table标签里面指定的规则名就是这个。

columns:指定要拆分的列名字。

algorithm:使用 function 标签中的 name 属性,连接表规则和具体路由算法。

function标签

  1. <function name="partitionByOrderFunc" class="io.mycat.route.function.PartitionByMod">
  2. <property name="count">2</property>
  3. </function>

name:指定算法的名字。

class:制定路由算法具体的类名字。

property: 为具体算法需要用到的一些属性。

mycat的对复杂的分片算法支持不友好,sharding-jdbc和sharding-proxy可以支持复杂的分片算法。

一、枚举法

如果数据某一字段大部分都是相同数据,可以用这种方式作为分片键。

比如按照区域东南,东北,西北,西南划分。

  1. <function name="hash-int" class="io.mycat.route.function.PartitionByFileMap">
  2. <property name="mapFile">partition-hash-int.txt</property>
  3. <property name="type">0</property>
  4. <property name="defaultNode">0</property>
  5. </function>

partition-hash-int.txt 配置:

10000=0

10010=1

比如数据库有一个列column名为area,area有 东南,东北,西北,西南,华中五个值

  1. 东南=0
  2. 东北=1
  3. 西北=2
  4. 西南=2
  5. 华中=0

其中分片函数配置中,mapFile标识配置文件名称,type默认值为0,0表示Integer,非零表示String,

所有的节点配置都是从0开始,及0代表节点1

  1. <function name="hash-str"
  2. class="io.mycat.route.function.PartitionByFileMap">
  3. <property name="mapFile">partition-hash-str.txt</property>
  4. <property name="type">2</property>
  5. </function>

/**

  • defaultNode 默认节点:小于0表示不设置默认节点,大于等于0表示设置默认节点,结点为指定的值
    *
    默认节点的作用:枚举分片时,如果碰到不识别的枚举值,就让它路由到默认节点
  • 如果不配置默认节点(defaultNode值小于0表示不配置默认节点),碰到
  • 不识别的枚举值就会报错,
  • like this:can’t find datanode for sharding column:column_name val:ffffffff
    */

二、固定分片hash算法

  1. <function name="func1" class="io.mycat.route.function.PartitionByLong">
  2. <property name="partitionCount">2,1</property>
  3. <property name="partitionLength">256,512</property>
  4. </function>

配置说明:

partitionCount 分片个数列表,partitionLength 分片范围列表
分区长度:默认为最大2^n=1024 ,即最大支持1024分区

约束 :

count,length两个数组的长度必须是一致的。
1024 = sum((count[i]*length[i]))

用法例子:

  1. 用法例子: 本例的分区策略:希望将数据水平分成3份,前两份各占25%,第三份占50%。(故本例非均匀分区)
  2. // |<—————————————1024—————————————>|
  3. // |<—-256—>|<—-256—>|<———-512————->|
  4. // | partition0 | partition1 | partition2 |
  5. 均匀分配
  6. <function name="func1" class="io.mycat.route.function.PartitionByLong">
  7. <property name="partitionCount">8</property>
  8. <property name="partitionLength">128</property>
  9. </function>

三、范围约定

  1. <function name="rang-long"
  2. class="io.mycat.route.function.AutoPartitionByLong">
  3. <property name="mapFile">autopartition-long.txt</property>
  4. </function>

# range start-end ,data node index

# K=1000,M=10000.

0-500M=0

500M-1000M=1

1000M-1500M=2

0-10000000=0

10000001-20000000=1

配置说明:

rang-long 函数中mapFile代表配置文件路径

所有的节点配置都是从0开始,及0代表节点1,此配置非常简单,即预先制定可能的id范围到某个分片

四、求模法

  1. <function name="mod-long" class="io.mycat.route.function.PartitionByMod">
  2. <!-- how many data nodes -->
  3. <property name="count">2</property>
  4. </function>

配置说明:

上面columns 标识将要分片的表字段,algorithm 分片函数,

此种配置非常明确即根据id与count(你的结点数)进行求模预算,相比方式1,此种在批量插入时需要切换数据源,id不连续

五、日期列分区法

  1. <function name="partbyday"
  2. class="io.mycat.route.function.PartitionByDate">
  3. <property name="dateFormat">yyyy-MM-dd</property>
  4. <property name="sNaturalDay">0</property>
  5. <property name="sBeginDate">2014-01-01</property>
  6. <property name="sEndDate">2014-01-31</property>
  7. <property name="sPartionDay">10</property>
  8. </function>

配置说明:

配置中配置了开始日期,分区天数,即默认从开始日期算起,分隔10天一个分区

还有一切特性请看源码

Assert.assertEquals(true, 0 == partition.calculate(“2014-01-01”));
Assert.assertEquals(true, 0 == partition.calculate(“2014-01-10”));
Assert.assertEquals(true, 1 == partition.calculate(“2014-01-11”));
Assert.assertEquals(true, 12 == partition.calculate(“2014-05-01”));

六、通配取模

  1. <function name="sharding-by-pattern" class="io.mycat.route.function.PartitionByPattern">
  2. <property name="patternValue">256</property>
  3. <property name="defaultNode">0</property>
  4. <property name="mapFile">partition-pattern.txt</property>
  5. </function>

partition-pattern.txt

id partition range start-end ,data node index

first host configuration

1-32=0

33-64=1

65-96=2

97-128=3

## second host configuration

129-160=4

161-192=5

193-224=6

225-256=7

0-0=7

配置说明:

上面columns 标识将要分片的表字段,algorithm 分片函数,patternValue 即求模基数,defaoultNode 默认节点,如果不配置了默认,则默认是0即第一个结点

mapFile 配置文件路径

配置文件中,1-32 即代表id%256后分布的范围,如果在1-32则在分区1,其他类推,如果id非数字数据,则会分配在defaoultNode 默认节点

String idVal = “0”;

Assert.assertEquals(true, 7 == autoPartition.calculate(idVal));
idVal = “45a”;
Assert.assertEquals(true, 2 == autoPartition.calculate(idVal));

七、ASCII码求模通配

  1. <function name="sharding-by-pattern" class="io.mycat.route.function.PartitionByPrefixPattern">
  2. <property name="patternValue">256</property>
  3. <property name="prefixLength">5</property>
  4. <property name="mapFile">partition-pattern.txt</property>
  5. </function>

partition-pattern.txt

range start-end ,data node index

ASCII

48-57=0-9

64、65-90=@、A-Z

97-122=a-z

first host configuration

1-4=0

5-8=1

9-12=2

13-16=3

second host configuration

17-20=4

21-24=5

25-28=6

29-32=7

0-0=7

配置说明:

patternValue 即求模基数,prefixLength ASCII 截取的位数

mapFile 配置文件路径

配置文件中,1-32 即代表id%256后分布的范围,如果在1-32则在分区1,其他类推

此种方式类似方式6只不过采取的是将列种获取前prefixLength位列所有ASCII码的和进行求模

sum%patternValue ,获取的值,在通配范围内的

即 分片数,

/**

  • ASCII编码:
  • 48-57=0-9阿拉伯数字
  • 64、65-90=@、A-Z
  • 97-122=a-z

    /

String idVal=”gf89f9a”;
Assert.assertEquals(true, 0==autoPartition.calculate(idVal));

idVal=”8df99a”;
Assert.assertEquals(true, 4==autoPartition.calculate(idVal));

idVal=”8dhdf99a”;
Assert.assertEquals(true, 3==autoPartition.calculate(idVal));

八、编程指定

  1. <function name="sharding-by-substring" class="io.mycat.route.function.PartitionDirectBySubString">
  2. <property name="startIndex">0</property> <!-- zero-based -->
  3. <property name="size">2</property>
  4. <property name="partitionCount">8</property>
  5. <property name="defaultPartition">0</property>
  6. </function>

配置说明:

此方法为直接根据字符子串(必须是数字)计算分区号(由应用传递参数,显式指定分区号)。

例如id=05100000002

在此配置中代表根据id中从startIndex=0,开始,截取siz=2位数字即05,05就是获取的分区,如果没传或者获取到的数字大于partitionCount默认分配到defaultPartition

  1. public Integer calculate(String columnValue) {
  2. String partitionSubString = columnValue.substring(this.startIndex, this.startIndex + this.size);
  3. int partition = Integer.parseInt(partitionSubString, 10);
  4. return this.partitionCount > 0 && partition >= this.partitionCount ? this.defaultPartition : partition;
  5. }

九、字符串拆分hash解析

https://www.jianshu.com/p/0c48b20280be】

  1. <function name="sharding-by-stringhash" class="org.opencloudb.route.function.PartitionByString">
  2. <property name="partitionLength">A,B</property><!-- zero-based -->
  3. <property name="partitionCount">C,D</property>
  4. <property name="hashSlice">0:2</property>
  5. </function>

配置说明:

partitionLength是求模基数

partitionCounth是分区数 C+D=分区数量 是从0开始到C+D-1

且AC + BD = 1024不然会抛出异常

hashSlice是解决分偏键的startIndex和endIndex,然后根据子字符串 hash运算 获取分片值

hashSlice : 0 means str.length(), -1 means str.length()-1

  1. "2" -> (0,2)
  2. "1:2" -> (1,2)
  3. "1:" -> (1,0)
  4. "-1:" -> (-1,0)
  5. ":-1" -> (0,-1)
  6. ":" -> (0,0)
  1. @Test
  2. public void testPartitionByString(){
  3. PartitionByString partition = new PartitionByString();
  4. partition.setPartitionCount("1,3");
  5. //这两个数组上下相乘要等与1024才可以 256*1 + 256*3 = 1024
  6. partition.setPartitionLength("256,256");
  7. partition.setHashSlice("2:6");
  8. //初始化
  9. partition.init();
  10. String key1 = "er568yuiop";
  11. String key2 = "sdffajsaswrw";
  12. String key3 = "w9204382sdfs";
  13. String key4 = "234uiHds";
  14. String key5 = "423423423";
  15. int partitionNum = partition.getPartitionNum();
  16. System.out.println("partionNum"+partitionNum);
  17. System.out.println("key1=》"+partition.calculate(key1));
  18. System.out.println("key2=》"+partition.calculate(key2));
  19. System.out.println("key3=》"+partition.calculate(key3));
  20. System.out.println("key4=》"+partition.calculate(key4));
  21. System.out.println("key5=》"+partition.calculate(key5));
  22. }

结果:

  1. partionNum4
  2. key1=》1
  3. key2=》0
  4. key3=》1
  5. key4=》3
  6. key5=》0

十、一致性hash

  1. <function name="murmur" class="io.mycat.route.function.PartitionByMurmurHash">
  2. <property name="seed">0</property><!-- 默认是0-->
  3. <property name="count">2</property><!-- 要分片的数据库节点数量,必须指定,否则没法分片—>
  4. <property name="virtualBucketTimes">160</property><!-- 一个实际的数据库节点被映射为这么多虚拟节点,默认是160倍,也就是虚拟节点数是物理节点数的160倍-->
  5. <!--
  6. <property name="weightMapFile">weightMapFile</property>
  7. 节点的权重,没有指定权重的节点默认是1。以properties文件的格式填写,以从0开始到count-1的整数值也就是节点索引为key,以节点权重值为值。所有权重值必须是正整数,否则以1代替 -->
  8. <!--
  9. <property name="bucketMapPath">/etc/mycat/bucketMapPath</property>
  10. 用于测试时观察各物理节点与虚拟节点的分布情况,如果指定了这个属性,会把虚拟节点的murmur hash值与物理节点的映射按行输出到这个文件,没有默认值,如果不指定,就不会输出任何东西 -->
  11. </function>

一致性hash预算有效解决了分布式数据的扩容问题,前1-9中id规则都多少存在数据扩容难题,而10规则解决了数据扩容难点

12.5 事务控制

  1. <!--分布式事务开关,0为不过滤分布式事务,1为过滤分布式事务(如果分布式事务内只涉及全局表,则不过滤,相当于禁用),2为不过滤分布式事务,但是记录分布式事务日志-->
  2. <property name="handleDistributedTransactions">0</property>

Mycat 1.6.5以前版本分布式事务是一种 弱XA 的方式

整个XA协议主要基于二阶段提交的概念。这个概念很好理解,当TM接到一个全局事务请求的时候,TM会把请求告知注册在它身上的所有RM,当所有RM准备就绪后,再执行commit操作。在这个过程中,如果某个RMcommit失败了,那么TM会进行协调,一可以回滚其它所有RM上的事务,二是等这个失败的RM恢复后,重新进行commit。

TM是用日志来记录XA事务的状态的,且日志必须存储的很可靠。当然,真个XA协议,就是建立在可靠的TM和RM之上。

而mycat的事务支持是弱XA的,事务内的SQL在各自分片上执行并且返回状态码,如果某个分片上的返回码为error,mycat就认为本次事务失败了,此时将会一次回滚事务所涉及到的所有分片。反之,如果所有的分片都返回成功的返回码,则当AP(应用程序)提交事务的时候,mycat会同时向事务涉及的所有分片发送提交事务的命令。

之所以说是弱XA,是因为在二阶段提交的工程中,若commit时某个节点出错了,只能回滚,而不会等其恢复后再次提交。

1.6.5开始支持XA事务。

Mycat 实现 XA 标准分布式事务,Mycat 作为XA 事务协调者角色,即使事务过程中 Mycat 宕机挂掉, 由于 Mycat 会记录事务日志,所以 Mycat 恢复后会进行事务的恢复善后处理工作。考虑到分布式事务 的性能开销比较大,所以只推荐在全局表的事务以及其他一些对一致性要 求比较高的场景。

使用

XA 事务需要设置手动提交

  1. set autocommit=0;

开启 XA 事务

  1. set xa=on;

执行sql

  1. insert into city(id,name,province) values(200,'chengdu','sichuan');
  2. update position set salary='300000' where id<5;

提交/回滚事务

  1. commit
  2. rollback;

在更改数据的时候会对行数据进行锁定

12.6 保证Repeatable Read

mycat 有一个特性,就是开事务之后,如果不运行 update/delete/select for update 等更新类语句 SQL 的话,不会将当前连接与当前 session 绑定。

Mysql下笔记 - 图37

这样做的好处是可以保证连接可以最大限度的复用,提升性能。

但是,这就会导致两次 select 中如果有其它的在提交的话,会出现两次同样的 select 不一 致的现象,即不 能 Repeatable Read,这会让人直连 MySQL 的人很困惑,可能会在依赖 Repeatable Read 的场景出现 问题。所以做了一个开关,当 server.xml 的 system 配置了 strictTxIsolation=true 的时候,会关掉这个 特性,以保证 repeatable read,加了开关 后如下图所示:

Mysql下笔记 - 图38

13 运维工具

13.1 Yearning SQL审核

博客:【https://www.jianshu.com/p/784fac087c7d】

简介

Yearning 开源的MySQL SQL语句审核平台,提供数据库字典查询,查询审计,SQL审核等多种功能。

Yearning 1.x 版本需Inception提供SQL审核及回滚功能。

Inception是集审核,执行,回滚于一体的自动化运维系统,它是根据MySQL代码修改过来的,工 作模式和MySQL相同。

Yearning是基于python实现的Web版人机交互界面。

Yearning 2.0 版本开始无需Inception,已自己实现了SQL审核及回滚功能。

Yearning1.0 python版本已不再进行官方维护。

Yearning2.0 golang版本为后续维护项目。

如仍使用python版本Yearning须知: Yearning python版本不会闭源,仍可基于AGPL3.0许可进行二次开发。 由于inception已闭源失去后续支持,python版本将失去对审核规则及审核逻辑的维护。(此问 题即使Yearning项目也无法解决。go版本已实现相关审核逻辑,由Yearning自己维护,保证 后续维护的可控性。) 已知python版本含有多个提权漏洞(用户 -> 管理员) golang版本通过token内嵌角色信息的方 式避免了此类问题。

强烈建议使用Yearning2.0。

功能

Yearning 工具包含的主要功能如下:

  • SQL查询

    • 查询导出
    • 查询自动补全
  • SQL审核

    • 流程化工单
    • SQL语句检测
    • SQL语句执行
    • SQL回滚
    • 历史审核记录
  • 推送

    • 站内信工单通知
    • E-mail工单推送 钉钉
    • webhook机器人工单推送
  • 其他

    • todoList
    • LDAP登陆
  • 用户权限及管理

    • 拼图式细粒度权限划分

13.2 Canal Mysql增量日志解析

canal 译意为水道/管道,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍 生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括以下内容:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

Mysql主从复制

Mysql下笔记 - 图39

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal的工作原理类似mysql主从同步原理:

Mysql下笔记 - 图40

  • canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
  • MySQL master收到dump协议请求,开始推送binary log 给canal
  • canal解析binary log对象(原始为byte流)

应用场景

该技术在拉勾网职位搜索业务中得到了采用,场景:在企业HR发布、更新或删除职位时,我们需要及时更新职位索引(搜索引擎里面的数据),便于求职者能快速的搜索到。

Mysql下笔记 - 图41

官方文档【https://github.com/alibaba/canal】

13.3 DataX 离线数据异构数据源同步

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、 SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

Mysql下笔记 - 图42

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX, 便能跟已有的数据源做到无缝数据同步。

当前使用现状

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了多年。 当年每天完成同步8w多道作业,每日传输数据量超过300TB。

框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为 Reader/Writer插件,纳入到整个同步框架中。插件方式,用哪个引入那个插件。

Mysql下笔记 - 图43

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流 控,并发,数据转换等核心技术问题。

插件体系

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插 件,就能无缝对接其他数据源。经过几年积累,DataX目前已经有了比较全面的插件体系,主流的 RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:

Mysql下笔记 - 图44

核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序 图,从整体架构设计非常简要说明DataX各个模块相互关系。

Mysql下笔记 - 图45

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来 完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切 分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发 执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task 重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所 有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的 线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任 务完成后Job成功退出。否则,异常退出,进程退出值非0

官方文档:【https://github.com/alibaba/DataX】

13.4 Percona-toolkit Mysql运行状态分析

percona-toolkit是一组高级命令行工具的集合,可以查看当前服务的摘要信息,磁盘检测,分析慢查询日志,查找重复索引,实现表同步等等。

安装

地址:https://www.percona.com/downloads/percona-toolkit/LATEST/

解压:tar -zxvf percona-toolkit-3.0.3_x86_64.tar.gz

然后直接进入 bin目录下 使用对应命令就可以

常用命令

pt-query-digest:

pt-query-digest是用于分析mysql慢查询的一个工具,它可以分析binlog、General log、slowlog,也可以通过showprocesslist或者mysqldumpslow命令来进行分析。可以把分析结果输出到文件中,分析过程是先对查询语句的条件进行参数化,然后对参数化以后的查询进行分组统计,统计出各查询的执行 时间、次数、占比等,可以借助分析结果找出问题进行优化。

  1. 直接分析慢查询文件
    1. pt-query-digest slow_OAK.log > slow_report.log
  1. 分析最近24小时内的查询
    1. pt-query-digest --since=24h slow_OAK.log > slow_report.log
  1. 分析只含有select语句的慢查询
    1. pt-query-digest --filter '$event->{fingerprint} =~ m/^select/i'
    2. slow_OAK.log> slow_report.log

查看SQL报告,总结慢语句有哪些,并可以看对应时间的消耗。

  1. cat slow_report.log

pt-index-usage:

pt-index-usage命令能够连接到MySQL数据库服务器,读取慢查询日志,并使用EXPLAIN询问MySQL 如何执行每个查询。分析完成时,它打印出一个关于查询没有使用的索引的报告。

对于我们已有的生产环境,随着系统运行的时间越长,DML操作越来越慢,这可能和我们最初设计的索 引是有关的(变慢的情况很多),项目一旦上线,很少会有人去关注索引的使用情况。某些索引是从 create开始就没使用过,这无形中就给MySQL增加了维护负担,任何对该表的DML操作,都要维护这些 没有被使用的索引。我们可以使用pt-index-usage工具找出哪些索引一直没有被使用,然后进行删除。

  1. pt-index-usage /path/to/slow_OAK.log --host localhost

也可以将报告写入到一个数据库表中,存入后方便我们的查看。如果在生产服务器上使用此功能,则应 该小心,它可能会增加负载。

  1. pt-index-usage slow.log --no-report --save-results-database percona

pt-online-schema-change:

pt-online-schema-change命令可以在线整理表结构,收集碎片,给大表添加字段和索引。避免出现锁 表导致阻塞读写的操作。针对 MySQL 5.7 版本,就可以不需要使用这个命令,直接在线 online DDL 就 可以了。

  1. pt-online-schema-change --user=root --password=root --host=localhost --
  2. alter="ADD COLUMN city_bak VARCHAR(256)" D=lagou,t=city --execute

pt-table-checksum:

pt-table-checksum命令可以检查主从复制一致性。pt table checksum通过在主机上执行校验和查询来 执行在线复制一致性检查。如果发现任何差异,或者出现任何警告或错误,则工具的“退出状态”为非0 该命令将连接到本地主机上的复制主机,对每个表进行校验和,并报告每个检测到的复制副本的结果: 比较lagou库的差异情况,在主库上面执行:

  1. [root@node1 bin]# ./pt-table-checksum --no-check-binlog-format --nocheckreplication-filters --databases=lagou --replicate=lagou.checksums --
  2. host=192.168.95.130 -uroot -proot
  3. TS ERRORS DIFFS ROWS CHUNKS SKIPPED TIME TABLE
  4. 05-10T18:01:05 0 0 1 1 0 0.013 lagou.heartbeat
  5. 05-10T18:01:05 0 0 0 1 0 0.015 lagou.city
  6. 05-10T18:01:05 0 0 0 1 0 0.011 lagou.position

diffs为0说明没有差别

如果只需要列出有差异的表,添加—replicate-check-only参数

  1. [root@node1 bin]# ./pt-table-checksum --no-check-binlog-format
  2. --nocheck-replication-filters --databases=lagou --replicate=lagou.checksums
  3. --replicate-check-only --host=192.168.95.130 -uroot -proot

更多命令参考官网:【https://www.percona.com/doc/percona-toolkit/3.0/index.html】

13.5 Lepus 可视化Mysql监控系统

MySQLMTOP 是一个由Python+PHP开发的开源MySQL企业监控系统。该系统由Python实现多进程数 据采集和告警,PHP实现Web展示和管理,优点如下:

  • MySQL服务器无需安装任何Agent,只需在监控WEB界面配置相关数据库信息启动监控进程后,即可对上百台MySQL数据库的状态、连接数、QTS、TPS、数据库流量、复制、 性能慢查询等进行实时监控。
  • 可以在数据库偏离设定的正常运行阀值(如连接异常,复制异常,复制延迟) 时发送告警邮件通知到DBA进行处理。
  • 可以对历史数据归档,通过图表展示数据库近期状态,以便DBA和开发人员能对遇到的问题进行分析和诊断。
  • 支持Oracle、Mongodb、Redis

功能

  • 实时 MySQL 状态监控和警报
    Lepus 持续监视 MySQL 的基本状态和性能信息,包括数据库连接状态,启动时间,数据 库版本,总连接数,活动进程,QPS,TPS,进出MySQL数据库的流量信息。在数据库状态异常或 偏离正常基准水平时发出报警邮件通知

  • 实时 MySQL复制监控
    MySQLMTOP自动发现 MySQL 复制拓扑结构,自动监视数据库的延时和binlog信息,可以了解所 有 MySQL 主服务器和从服务器的性能、可用性和运行状况。并在问题(如从服务器延迟)导致停机前向管理员提供改正建议。

  • 远程监控云中的 MySQL
    适合于云和虚拟机的设计,能远程监视MySQL服务器不需要任何远程代理器。

  • 直观管理所有 MySQL
    提供一个基于 Web 的界面,可令全面深入地了解数据库性能、可用性、关键活动 等;直观地查看一台服务器、自定义的应用组或所有服务器。一组丰富的实时图形和历史图形将帮 助您深入了解详细的服务器统计信息。

  • 可视化MySQL慢查询分析
    监视实时查询性能,查看执行统计信息,筛选和定位导致性能下降的 SQL 代码。结合使用 Information Schema 可直接从 MySQL 服务器收集数据,无需额外的软件或配置。

  • 性能监控
    监视影响 MySQL 性能的主要指标。如Key_buffer_read_hits、Key_buffer_write_hits、 Thread_cache_hits、Key_blocks_used_rate、Created_tmp_disk_tables_rate等信息,根据相关 性能指标可以对服务器核心参数进行调整优化。

官网:【http://www.lepus.cc/】

13.6 ELK

为啥用ELK

在简单应用中,直接在日志文件中 grep就可以获得自己想要的信息。但在规模较大分布式系统中,此 方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中 化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点 上的日志统一收集,管理,访问。

一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分 情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块。构建一套集中式日志系统,可以 提高定位问题的效率。

一个完整的集中式日志系统,需要包含以下几个主要特点:

  • 收集-能够采集多种来源的日志数据
  • 传输-能够稳定的把日志数据传输到中央系统
  • 存储-如何存储日志数据
  • 分析-可以支持 UI 分析
  • 警告-能够提供错误报告,监控机制

实现架构介绍

ELK 最早是 Elasticsearch(简称ES)、Logstash、Kibana 三款开源软件的简称,三款软件后来被同一 公司收购,并加入了Xpark、Beats等组件,改名为Elastic Stack,成为现在最流行的开源日志解决方 案,虽然有了新名字但大家依然喜欢叫ELK,现在所说的ELK就指的是基于这些开源软件构建的日志系 统。

Mysql下笔记 - 图46

  • MySQL 服务器安装 Filebeat 作为 agent 收集 slowLog

  • Filebeat 读取 MySQL 慢日志文件做简单过滤传给 Kafka 集群

  • Logstash 读取 Kafka 集群数据并按字段拆分后转成 JSON 格式存入 ES 集群

  • Kibana 读取ES集群数据展示到web页面上

各软件示例配置

filebeat:

filebeat读取收集slow log,处理后写入kafka。收集日志时需要解决以下问题:

  • 日志行合并
  • 获取SQL执行时间
  • 确定SQL对应的DB名
  • 确定SQL对应的主机

示例配置:

Mysql下笔记 - 图47

参数:

  • input_type:指定输入的类型是log或者是stdin
  • paths:慢日志路径,支持正则,比如/data/*.log
  • exclude_lines:过滤掉#Time开头的行
  • multiline.pattern:匹配多行时指定正则表达式,这里匹配以# Time或者# User开头的行, Time行要先匹配再过滤
  • multiline.negate:定义上边pattern匹配到的行是否用于多行合并,也就是定义是不是作为日志 的一部分
  • multiline.match:定义匹配的行日志作为日志元素的开始还是结束。
  • tail_files:定义是从文件开头读取日志还是结尾,这里定义为true,从现在开始收集,之前已存在的不管
  • name:设置filebeat的名字,如果为空则为服务器的主机名,这里我们定义为服务器IP
  • output.kafka:配置要接收日志的kafka集群地址可topic名称

Kafka接收示例:

  1. {"@timestamp":"2020-05-08T09:36:00.140Z","beat":
  2. {"hostname":"oak","name":"10.63.144.71","version":"5.4.0"},"input_type":"log","m
  3. essage":"# User@Host: select[select] @ [10.63.144.16] Id: 23460596\n#
  4. Query_time: 0.155956 Lock_time: 0.000079 Rows_sent: 112 Rows_examined:
  5. 366458\nSET timestamp=1533634557;\nSELECT DISTINCT(uid) FROM common_member WHERE
  6. hideforum=-1 AND uid !=
  7. 0;","offset":1753219021,"source":"/data/slow/mysql_slow.log","type":"log"}

Logstash:

Logstash消费kafka消息,可以利用kafka consumer group实现集群模式消费保障单点故障不影响日志 处理,grok正则处理后写入elasticsearch。

Mysql下笔记 - 图48

Mysql下笔记 - 图49

参数:

  • input:配置 kafka 的集群地址和 topic 名字
  • filter:过滤日志文件,主要是对 message 信息(前文 kafka 接收到的日志格式)进行拆分,拆 分成一个一个易读的字段,例如User、Host、Query_time、Lock_time、timestamp等。
  • grok:MySQL版本不同,慢查询日志格式有些差异,grok可以根据不同的慢查询格式写不同的正则表达式去匹配,当有多条正则表达式存在时,logstash会从上到下依次匹配,匹配到一条后边的 则不再匹配。
  • date:定义让SQL中的timestamp_mysql字段作为这条日志的时间字段,kibana上看到的实践排序的数据依赖的就是这个时间
  • output:配置ES服务器集群的地址和index,index自动按天分割

ELK的还有很多其他方面的使用。

13.7 Prometheus

Prometheus于2012年由SoundCloud创建,目前已经已发展为最热门的分布式监控系统。

Prometheus完全开源的,被很多云厂商(架构)内置,在这些厂商(架构)中,可以简单部署 Prometheus,用来监控整个云基础架构设施。比如DigitalOcean或Docker都使用普罗米修斯作为基础 监控。

Prometheus是一个时间序列数据库,它涵盖了可以绑定的整个生态系统工具集及其功能。

Prometheus主要用于对基础设施的监控,包括服务器、数据库、VPS,几乎所有东西都可以通过 Prometheus进行监控。

Prometheus主要优点如下:

  • 提供多维度数据模型和灵活的查询方式。通过将监控指标关联多个tag,来将监控数据进行任意维 度的组合,并且提供简单的PromQL,还提供查询接口,可以很方便地结合等GUI组件展示数据。
  • 在不依赖外部存储的情况下,支持服务器节点的本地存储。通过Prometheus自带的时序数据库, 可以完成每秒千万级的数据存储。
  • 定义了开发指标数据标准,以基于HTTP的pull方式采集时序数据。只有实现了Prometheus监控数 据格式的监控数据才可以被Prometheus采集、汇总。
  • 支持通过静态文件配置和动态发现机制发现监控对象,自动完成数据采集。Prometheus目前已经 支持Kubernetes、etcd、consul等多种服务发现机制,可以减少运维人员的手动配置环节。
  • 易于维护,可以通过二进制文件直接启动,并且提供容器化部署镜像。
  • 支持数据的分区采集和联邦部署,支持大规模集群监控。

Prometheus生态系统

Alertmanager:Prometheus通过配置文件定义规则将告警信息推送到Alertmanager。Alertmanager 可以将其导出到多个端点,例如Pagerduty或Email等。

数据可视化:在Web UI中可视化时间序列数据,轻松过滤查看监控目标的信息,与Grafana、Kibana等 类似。

服务发现:Prometheus可以动态发现监控目标,并根据需要自动废弃目标。这在云架构中使用动态变 更地址的容器时,非常方便。

Mysql下笔记 - 图50

Prometheus实现原理

数据存储:Prometheus指标数据支持本地存储和远程存储。

Mysql下笔记 - 图51

指标数据:Prometheus使用键-值对存储监控数据。键描述了测量值时将实际测量值存储为数字的值。 Prometheus并不会存储原始信息,如日志文本,它存储的是随时间汇总的指标。 一般来说键也就监控指标,如果想要获得有关指标的更多详细信息,Prometheus有一个标签的概念。 标签旨在通过向键添加其他字段来为指标提供更详细信息。

Mysql下笔记 - 图52

度量类型

Prometheus监控指标有Counter(计数器)、Gauge(仪表盘)、Histogram(直方图)和Summary(摘要)四种 度量类型。

  • Counter 计数器
    Counter是我们使用的最简单的度量标准形式。计数器的值只能增加或重置为0,比如,要 计算服务器上的HTTP错误数或网站上的访问次数,这时候就使用计数器。

  • Gauges
    Gauges用于处理随时间减少或增加的值。比如温度、内存变化等。Gauge类型的值可以上升和下 降,可以是正值或负值。

  • Histogram 直方图
    Histogram是一种更复杂的度量标准类型。它为我们的指标提供了额外信息,例如观察值的 总和及其数量,常用于跟踪事件发生的规模。其值在具有可配置上限的存储对象中聚合。比如,为 了监控性能指标,我们希望得到在有20%的服务器请求响应时间超过300毫秒发送警告。对于涉及 比例的指标就可以考虑使用直方图。

  • Summary 摘要
    Summary是对直方图的扩展。除了提供观察的总和和计数之外,它们还提供滑动窗口上的分 位数度量。分位数是将概率密度划分为相等概率范围的方法。直方图随时间汇总值,给出总和和计 数函数,使得易于查看给定度量的变化趋势。而摘要则给出了滑动窗口上的分位数(即随时间不断 变化)。

PromQL

对于Prometheus数据,我们可以通过HTTP来查询,如果是复杂的数据查询,则还可以使用PromQL进 行。和关系型数据库实现SQL解析一样,Prometheus实现了一套自己的数据库语言解析器,最大的区 别是支持查询。

使用Prometheus的PromQL,会处理两种向量:

  • 即时向量:表示在最近时间戳中跟踪的指标。
  • 时间范围向量:用于查看度量随时间的演变,可以使用自定义时间范围查询Prometheus。

PromQL API公开了一组方便查询数据操作的函数。用它可以实现排序,数学函计算(如导数或指数函 数),统计预测计算(如Holt Winters函数)等。

Instrumentation仪表化

仪表化是Prometheus的一个重要组成部分。在从应用程序检索数据之前,必须要仪表化它们。 Prometheus术语中的仪表化表示将客户端类库添加到应用程序,以便它们向Prometheus吐出指标。 可以对大多数主流的编程语言进行仪表化。

在仪表化操作时,需要创建内存对象(如仪表盘或计数器),然后选择指标公开的位置。Prometheus 将从该位置获取并存储到时间序列数据库。

Exporters模板

Exporter是一个采集监控数据样本的组件。除了官方实现的Exporter,还有很多第三方实现如 Redis exporter 和RabbitMQ exporter等。

这些Exporters通过HTTPS/HTTP方式、TCP方式、本地文件方式 或标准协议方式访问。 常见的Exporters模版有:

  • 数据库模版:用于MongoDB数据库,MySQL服务器的配置。
  • HTTP模版:用于HAProxy,Apache或Nginx等Web服务器和代理的配置。
  • Unix模版:用来使用构建的节点导出程序监视系统性能,可以实现完整的系统指标的监控。

Mysql下笔记 - 图53

Altermanager告警

在处理时间序列数据库时,我们希望对数据进行处理,并对结果给出反馈,而这部分工作由告警来实现。

告警在Grafana中非常常见,Prometheus是通过Alertmanager实现完成的告警系统。Alertmanager是 一个独立的工具,可以绑定到Prometheus并运行自定义Alertmanager。告警通过配置文件定义,定义 由一组指标规则组成,如果数据命中这些规则,则会触发告警并将信息发送到预定义的目标。 Prometheus的告警,可以通过email,Slack webhooks,PagerDuty和自定义HTTP目标等。

Mysql下笔记 - 图54