前言

分布式ID在多种场景下都有广泛的应用,如分库分表后生成全局用户Id,订单Id,分布式事物中的全局Id,消息队列中的全局消息Id.因此本文分析一下常用的分布式Id的常见实现方案。

SnowFlake-雪花算法

SnowFlake是Twitter最早提出的一种全局ID生成算法,可以产生一个Time-Based的全局ID, 由于生成简单、ID趋势递增,业界采用的比较广泛。比如在snowflake中的64-bit分别表示如下图(图片来自网络)所示:

snowflake-structure.png

41-bit的时间可以表示(1L<<41)/(1000L_3600_24*365)=69年的时间,10-bit机器可以分别表示1024台机器。如果我们对IDC划分有需求,还可以将10-bit分5-bit给IDC,分5-bit给工作机器。这样就可以表示32个IDC,每个IDC下可以有32台机器,可以根据自身需求定义。12个自增序列号可以表示2^12个ID,理论上snowflake方案的QPS约为409.6w/s,这种分配方式可以保证在任何一个IDC的任何一台机器在任意毫秒内生成的ID都是不同的。

SnowFlakeID是一个64 bit,8 bytes的整数类型,结构如下:
bit[0]:最高位填0,保证ID是正整数。
bit[1-41]:时间戳,41 bit。表示自某个起始时间(可自行设定)以来的毫秒数,支持69年跨度。
bit[42-51]:Worker ID,10 bits。表示节点的唯一标识,能同时支持1024个不同的Worker。Worker ID可依赖外部配置中心生成,推荐前5位数据中心ID,后5位PID。
bit[52-63]:并发ID序列号,12 bits。用于同一毫秒内并发产生的ID自增序号,采用原子递增计数器实现,每毫秒重新归0,一毫秒内可以并发产生4096个ID。如果在这个毫秒内生成的数量超过4096,可以阻塞等待到下个毫秒来生成。

这种方式的优缺点

1)本地化生成,算法简单,效率高
2)适合主键字段:时间戳位于ID的高位,毫秒内自增序列在低位,ID趋势递增;长度8个字节,适合数据库存储。
3)不足之处:
3.1)依赖机器时钟,如果时钟错误比如时钟不同步、时钟回拨,会产生重复ID
3.2)每个节点的Worker ID要借助外部服务比如Zookeeper、Redis、MySQL分配
ID容量局限性:时间偏移量支持2^41ms=69年,可以在算法中自定义起始时间,年限略短,一般够用。

Twitter SnowFlake实现

SnowFlake算法用来生成64位的ID,刚好可以用long整型存储,能够用于分布式系统中生产唯一的ID, 并且生成的ID有大致的顺序。 在这次实现中,生成的64位ID可以分成5个部分:
0 - 41位时间戳 - 5位数据中心标识 - 5位机器标识 - 12位序列号
5位数据中心标识跟5位机器标识这样的分配仅仅是当前实现中分配的,如果业务有其实的需要,可以按其它的分配比例分配,如10位机器标识,不需要数据中心标识。

  1. /**
  2. * twitter的snowflake算法 -- java实现
  3. * @see @{https://github.com/beyondfengyu/SnowFlake?spm=ata.13261165.0.0.78d12ace8n92XX}
  4. * @author beyond
  5. * @date 2016/11/26
  6. */
  7. public class SnowFlake {
  8. /**
  9. * 起始的时间戳
  10. */
  11. private final static long START_STMP = 1480166465631L;
  12. /**
  13. * 每一部分占用的位数
  14. */
  15. private final static long SEQUENCE_BIT = 12; //序列号占用的位数
  16. private final static long MACHINE_BIT = 5; //机器标识占用的位数
  17. private final static long DATACENTER_BIT = 5;//数据中心占用的位数
  18. /**
  19. * 每一部分的最大值
  20. */
  21. private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
  22. private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
  23. private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
  24. /**
  25. * 每一部分向左的位移
  26. */
  27. private final static long MACHINE_LEFT = SEQUENCE_BIT;
  28. private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
  29. private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
  30. private long datacenterId; //数据中心
  31. private long machineId; //机器标识
  32. private long sequence = 0L; //序列号
  33. private long lastStmp = -1L;//上一次时间戳
  34. public SnowFlake(long datacenterId, long machineId) {
  35. if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
  36. throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
  37. }
  38. if (machineId > MAX_MACHINE_NUM || machineId < 0) {
  39. throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
  40. }
  41. this.datacenterId = datacenterId;
  42. this.machineId = machineId;
  43. }
  44. /**
  45. * 产生下一个ID
  46. *
  47. * @return
  48. */
  49. public synchronized long nextId() {
  50. long currStmp = getNewstmp();
  51. if (currStmp < lastStmp) {
  52. throw new RuntimeException("Clock moved backwards. Refusing to generate id");
  53. }
  54. if (currStmp == lastStmp) {
  55. //相同毫秒内,序列号自增
  56. sequence = (sequence + 1) & MAX_SEQUENCE;
  57. //同一毫秒的序列数已经达到最大
  58. if (sequence == 0L) {
  59. currStmp = getNextMill();
  60. }
  61. } else {
  62. //不同毫秒内,序列号置为0
  63. sequence = 0L;
  64. }
  65. lastStmp = currStmp;
  66. return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
  67. | datacenterId << DATACENTER_LEFT //数据中心部分
  68. | machineId << MACHINE_LEFT //机器标识部分
  69. | sequence; //序列号部分
  70. }
  71. private long getNextMill() {
  72. long mill = getNewstmp();
  73. while (mill <= lastStmp) {
  74. mill = getNewstmp();
  75. }
  76. return mill;
  77. }
  78. private long getNewstmp() {
  79. return System.currentTimeMillis();
  80. }
  81. public static void main(String[] args) {
  82. SnowFlake snowFlake = new SnowFlake(1<<4, 1<<4);
  83. for (int i = 0; i < 5; i++) {
  84. System.out.println(snowFlake.nextId());
  85. }
  86. }
  87. }

Java UUID

UUID(Universally Unique IDentifier)是一个全局Id的规范,标准型式包含32个16进制数字,以连字号分为五段,形式为8-4-4-4-12的36个字符.示例如:78f236de-a628-4ea7-9382-b6030fcd8454. 典型实现如下:

  1. for (int i = 0; i < 10; i++) {
  2. UUID uuid = UUID.randomUUID();
  3. System.out.println(uuid.toString());
  4. }

优点是:本地生成,性能较好。缺点:不易于存储:UUID太长,16字节128位,通常以36长度的字符串表示,很多场景不适用,无顺序,不利于作DB主键。

数据库批量获取

创建一个sequence表,用name表示序列名称,一般用于某业务,用value表示当前值,程序获取到当前值后在内存里缓存一个区间。区间最大值为配置的步长。关键逻辑为:
1)程序初始化,先根据name查询value,如果不存在则初始化0.
2)调用nextRange时,查询当前value,然后计算一个新value=oldValue+step.
3) 最后update sequence表的value字段为newValue。

单数据库批量获取实现

准备数据库表

  1. CREATE TABLE `sequence` (
  2. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  3. `name` varchar(64) NOT NULL,
  4. `value` bigint(20) NOT NULL,
  5. `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  6. PRIMARY KEY (`id`),
  7. UNIQUE KEY `unique_name` (`name`)
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

实现代码如下

SequenceRange定义:

  1. /**
  2. * 定义一个从数据库获取的序列区间
  3. * 该区间表示从数据库里获取一个起始值,然后加上一个步长
  4. *
  5. * @author xiele.xl
  6. * @date 2020-05-19 18:15
  7. */
  8. public class SequenceRange {
  9. /**
  10. * 最小值
  11. */
  12. private final long min;
  13. /**
  14. * 区间最大值
  15. */
  16. private final long max;
  17. /**
  18. * 区间记录值
  19. */
  20. private AtomicLong value;
  21. /**
  22. * 是否到达最大值
  23. */
  24. private volatile boolean over = false;
  25. public SequenceRange(long min, long max) {
  26. this.min = min;
  27. this.max = max;
  28. this.value = new AtomicLong(min);
  29. }
  30. /**
  31. * 获取递增值
  32. * @return
  33. */
  34. public long getAndIncrement() {
  35. if (over) {
  36. return -1;
  37. }
  38. long current = value.getAndIncrement();
  39. if (current > this.max) {
  40. over = true;
  41. return -1;
  42. }
  43. return current;
  44. }
  45. /**
  46. * Getter method for property <tt>min</tt>.
  47. *
  48. * @return property value of min
  49. */
  50. public long getMin() {
  51. return min;
  52. }
  53. /**
  54. * Getter method for property <tt>max</tt>.
  55. *
  56. * @return property value of max
  57. */
  58. public long getMax() {
  59. return max;
  60. }
  61. /**
  62. * Getter method for property <tt>over</tt>.
  63. *
  64. * @return property value of over
  65. */
  66. public boolean isOver() {
  67. return over;
  68. }
  69. /**
  70. * Setter method for property <tt>over</tt>.
  71. *
  72. * @param over value to be assigned to property over
  73. */
  74. public void setOver(boolean over) {
  75. this.over = over;
  76. }
  77. }

序列服务实现:

  1. /**
  2. * @author xiele.xl
  3. * @date 2020-05-19 17:20
  4. */
  5. public class SimpleBatchSequence implements SequenceService {
  6. private static final String jdbc_url = "jdbc:mysql://localhost:3306/spring_demo";
  7. private static final String user = "root";
  8. private static final String password = "Xiele";
  9. // 定义可见变量
  10. private volatile SequenceRange range;
  11. private static final String DEFAULT_SEQ_NAME = "default";
  12. private static final String SELECT_SQL = "select value from sequence where name = ?";
  13. private ReentrantLock lock = new ReentrantLock();
  14. private DataSource dataSource ;
  15. public Connection getConnection() throws SQLException {
  16. return dataSource.getConnection();
  17. }
  18. public void init() throws SQLException {
  19. DruidDataSource ds = new DruidDataSource();
  20. ds.setUrl(jdbc_url);
  21. ds.setUsername(user);
  22. ds.setPassword(password);
  23. ds.init();
  24. setDataSource(ds);
  25. initValue(DEFAULT_SEQ_NAME);
  26. }
  27. public void initValue(String name) throws SQLException {
  28. Connection connection = null;
  29. PreparedStatement pst = null;
  30. ResultSet rs = null;
  31. try {
  32. connection = getConnection();
  33. pst = connection.prepareStatement(SELECT_SQL);
  34. pst.setString(1, name);
  35. rs = pst.executeQuery();
  36. int result = 0;
  37. while (rs.next()) {
  38. result++;
  39. }
  40. // 不存在插入初始化值
  41. if (result == 0) {
  42. pst = connection.prepareStatement(
  43. "insert sequence (`name`,`value`,`gmt_modified`) value (?,?,?)");
  44. pst.setString(1, name);
  45. pst.setLong(2, 0L);
  46. pst.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
  47. final int effectedRow = pst.executeUpdate();
  48. Assert.state(effectedRow == 1, "insert init value failed");
  49. }
  50. } finally {
  51. closeDbResource(pst, rs, connection);
  52. }
  53. }
  54. /**
  55. * 获取一个序列区间
  56. *
  57. * @param name
  58. * @return
  59. * @throws SQLException
  60. */
  61. public SequenceRange nextRange(String name) {
  62. Connection connection = null;
  63. PreparedStatement pst = null;
  64. ResultSet rs = null;
  65. long oldValue = 0L;
  66. long newValue = 0L;
  67. // 先取出当前值
  68. try {
  69. connection = getConnection();
  70. pst = connection.prepareStatement(SELECT_SQL);
  71. pst.setString(1, name);
  72. rs = pst.executeQuery();
  73. rs.next();
  74. oldValue = rs.getLong("value");
  75. // 校验value的范围
  76. if (oldValue < 0) {
  77. throw new RuntimeException("not expected sequence value " + oldValue);
  78. }
  79. if (oldValue > Long.MAX_VALUE) {
  80. throw new RuntimeException("sequence value is exceeded max long value");
  81. }
  82. newValue = oldValue + getStep();
  83. } catch (SQLException e) {
  84. e.printStackTrace();
  85. } finally {
  86. closeDbResource(pst, rs, connection);
  87. }
  88. // 在更新新值到db里
  89. try {
  90. connection = getConnection();
  91. pst = connection.prepareStatement(
  92. "update sequence set value = ?, gmt_modified = ? where name = ? and value = ?");
  93. pst.setLong(1, newValue);
  94. pst.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
  95. pst.setString(3, name);
  96. pst.setLong(4, oldValue);
  97. Assert.state(pst.executeUpdate() == 1, "update failed");
  98. } catch (SQLException e) {
  99. e.printStackTrace();
  100. } finally {
  101. closeDbResource(pst, rs, connection);
  102. }
  103. return new SequenceRange(oldValue + 1, newValue);
  104. }
  105. @Override
  106. public long nextValue() {
  107. if (range == null) {
  108. lock.lock();
  109. try {
  110. if (range == null) {
  111. range = nextRange(DEFAULT_SEQ_NAME);
  112. System.err.println(String.format("初次获取Range,线程编号=%s,获取成功", Thread.currentThread().getName()));
  113. }
  114. } finally {
  115. lock.unlock();
  116. }
  117. }
  118. long value = range.getAndIncrement();
  119. // 表示本区间已取完,需要重新从db里拿一个起始值
  120. // 由于存在多个线程同时来拿,因此只有一个线程可以拿成功
  121. // 如果多个线程同时拿完了,并发进入==-1,再次加锁,获取下一段。
  122. if (value == -1) {
  123. lock.lock();
  124. try {
  125. for (; ; ) {
  126. if (range.isOver()) {
  127. range = nextRange(DEFAULT_SEQ_NAME);
  128. System.err.println(
  129. String.format("用完Range,再次获取Range,线程编号=%s,获取成功", Thread.currentThread().getName()));
  130. }
  131. value = range.getAndIncrement();
  132. if (value == -1) {
  133. continue;
  134. }
  135. break;
  136. }
  137. } finally {
  138. lock.unlock();
  139. }
  140. }
  141. return value;
  142. }
  143. /**
  144. * 默认步长,可配置
  145. *
  146. * @return
  147. */
  148. public long getStep() {
  149. return 1000;
  150. }
  151. /**
  152. * Setter method for property <tt>dataSource</tt>.
  153. *
  154. * @param dataSource value to be assigned to property dataSource
  155. */
  156. public void setDataSource(DataSource dataSource) {
  157. this.dataSource = dataSource;
  158. }
  159. private void closeDbResource(Statement st, ResultSet rs, Connection conn) {
  160. closeStatement(st);
  161. closeResultSet(rs);
  162. closeConnection(conn);
  163. }
  164. private void closeStatement(Statement st) {
  165. if (st != null) {
  166. try {
  167. st.close();
  168. } catch (SQLException e) {
  169. e.printStackTrace();
  170. }
  171. }
  172. }
  173. private void closeResultSet(ResultSet rs) {
  174. if (rs != null) {
  175. try {
  176. rs.close();
  177. } catch (SQLException e) {
  178. e.printStackTrace();
  179. }
  180. }
  181. }
  182. private void closeConnection(Connection connection) {
  183. if (connection != null) {
  184. try {
  185. connection.close();
  186. } catch (SQLException e) {
  187. e.printStackTrace();
  188. }
  189. }
  190. }
  191. public static void main(String[] args) throws SQLException, InterruptedException {
  192. // 测试case1:单线程测试正确性
  193. SimpleBatchSequence sequence = new SimpleBatchSequence();
  194. sequence.init();
  195. //int count = 1001;
  196. //while (count-- > 0) {
  197. // System.out.println("nextVal: " + sequence.nextValue());
  198. //}
  199. // 多线程并发获取:测试并发性
  200. final int nThreads = 100;
  201. final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
  202. CountDownLatch latch = new CountDownLatch(nThreads);
  203. for (int i = 0; i < nThreads; i++) {
  204. executor.execute(() -> {
  205. int cnt = 10;
  206. while (cnt-- > 0) {
  207. System.out.println(String
  208. .format("currentThreadName:%s. nextVal: %d", Thread.currentThread().getName(),
  209. sequence.nextValue()));
  210. }
  211. latch.countDown();
  212. });
  213. }
  214. latch.await();
  215. System.out.println("Latch end");
  216. executor.shutdown();
  217. System.out.println("executor shutdown");
  218. System.out.println(executor);
  219. }
  220. }

多数据库实例的批量获取

实际工程应用中需要构建多数据源集群来保证高可用,可扩展的发号能力。
多数据源要解决的问题同样是各个数据源要确保生成互不重叠的ID序列,即将原来的自然数序列分成若干份互不相交的子等差数列。实现原理如下:把一个大批量ID段(外步长)按数据源分成多个小批量ID段(内步长),每个dataSource每次分配一个内步长的ID段,然后按照外步长跃进;通过这种错位交替分配策略,可以使不同dataSource产生的ID段互不相交。
在这种场景下,有多个db里有相同的Sequence表,涉及到内步长innerStep与外步长outStep

multi_datasource_step.png

实现代码思路

初始化阶段:

  1. 配置数据库实例数量dsCount,内部长innerStep,默认为1000, 计算外步长outStep=dsCount * innerStep.
  2. 自动调节sequence表:
    如果sequence里无初始值,则分别初始化每个db实例里的初始值。例如:以内步长为1000,dsCount=2为例,则ds1库里Sequence表里的value序列应该为为0,1000.ds2库里Sequence表里的value序列应该为1000,2000
    如果已经存在value,验证当前value是否需要重新需要校正一下。

获取Range阶段

随机从ds里拿取一个value,然后计算一下newValue=value+outStep,在update value为newValue.

group_sequence_next_range.jpg

单一数据库表自增

创建一个Sequence表,用MySQL的自增Id机制来生成序列。采用replace into 语法加select last_insert_id,由于两个sql预计不是原子的,需要包含在一个事物里。

Sequence表结构如下:

  1. CREATE TABLE `sequence` (
  2. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  3. `name` char(8) NOT NULL,
  4. PRIMARY KEY (`id`),
  5. UNIQUE KEY `uk_name` (`name`)
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8

事物中获取Id的SQL脚本类似如下:

  1. begin;
  2. replace into sequence(`name`) value ('default2');
  3. select last_insert_id();
  4. commit;

注:Mysql可配置自增起始值(@@auto_increment_increment)与步长(@@auto_increment_offset)来控制Id的生成策略,不同的业务可用不同的序列表来控制对应的Id序列。

JDBC实现代码

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.PreparedStatement;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. /**
  7. * 单一数据库表自增Id实现序列服务
  8. * 实现思路为:事物中执行replace into && select last_insert_id()
  9. *
  10. * @author xiele.xl
  11. * @date 2020-05-19 11:04
  12. */
  13. public class SingleSequenceService implements SequenceService {
  14. private static final String jdbc_url = "jdbc:mysql://localhost:3306/spring_demo";
  15. private static final String user = "root";
  16. private static final String password = "Xiele";
  17. private static ThreadLocal<Connection> localConn = new ThreadLocal<>();
  18. static {
  19. try {
  20. Class.forName("com.mysql.jdbc.Driver");
  21. } catch (Exception e) {
  22. throw new RuntimeException("Can't register driver!");
  23. }
  24. }
  25. public static Connection getConnection() {
  26. try {
  27. Connection conn = localConn.get();
  28. if (conn == null) {
  29. conn = DriverManager.getConnection(jdbc_url, user, password);
  30. localConn.set(conn);
  31. }
  32. return conn;
  33. } catch (Exception e) {
  34. throw new RuntimeException("can not get connection, e=" + e);
  35. }
  36. }
  37. @Override
  38. public long nextId() {
  39. Connection con = getConnection();
  40. try {
  41. // 设置事物为非自动提交,并开启事物
  42. con.setAutoCommit(false);
  43. final PreparedStatement ps = con.prepareStatement(
  44. "replace into `sequence` (name) values ('default')");
  45. ps.executeUpdate();
  46. final ResultSet rs = ps.executeQuery("select LAST_INSERT_ID()");
  47. rs.next();
  48. long nextId = rs.getLong(1);
  49. con.commit();
  50. return nextId;
  51. } catch (SQLException e) {
  52. try {
  53. con.rollback();
  54. } catch (SQLException ex) {
  55. }
  56. throw new RuntimeException(e);
  57. }
  58. }
  59. public static void main(String[] args) {
  60. SingleSequenceService uss = new SingleSequenceService();
  61. for (int i = 0; i < 10; i++) {
  62. System.out.println(uss.nextId());
  63. }
  64. }
  65. }

其他方案

Redis原子命令实现递增

Redis提供一些自增原子命令,可以对KEY的值进行原子的增量操作,把返回值作为新的ID。Redis自增命令有:
INCRBY :给key对应的value加上1,value必须是数值类型。
INCR :为key对应的value加上指定的增量值。
HINCRBY :给HASH key中的field指定的域加上指定的增量值。

方案总结

  1. Redis方案和基于数据库的方案本质是一样的,只是发号源由数据库换成了Redis、事务由Redis的单线程机制来保证;也可以类似数据库批量方式,一次生成一批ID。
  2. Redis的ID是一个64 bit signed integer,容量可以充分保证。
  3. 可以保证ID的趋势递增,适用于主键字段
  4. 不足之处:Redis的AOF、RDB两种持久化方式都无法绝对保证数据不丢失,重启Redis有可能产生重复ID。

Zookeeper顺序节点实现递增

Zookeeper作为分布式协调服务框架,提供多种类型的ZNode来存储数据,其中顺序节点(Sequence Node)可以用来生成单调自增序列。它的机制如下:

在同一个路径中创建任意顺序节点(不需要同名),Zookeeper都会在节点名称中追加一个单调增长的计数值,格式是左补0的10位数字,如:”0000000001”、”0000000002”等
Zookeeper集群能保证多个客户端并发创建顺序节点时,只有一个会争抢成功,保证并发的一致性。
顺序节点可以是持久化的,需要应用自行删除;也可以是临时的,创建该节点的客户端断开后,ZK会自动删除。两种方式都不会影响序列的增长。

方案总结

  1. 操作简单,ID单调递增
  2. ID上限:顺序节点的序号生成是由其父节点维持的一个计数器生成的,计数器是一个4字节的signed整数,因此ID的最大值是2147483647,超出就会溢出。
  3. ZooKeeper只能顺序发号,无法批量创建ID,交易性能存在瓶颈,不适用于高并发的发号场景。

参考

Leaf——美团点评分布式ID生成系统

微信序列号生成器架构设计及演变