一、leaf初始化

  • 生成workerID 利用zk顺序节点,不同集群节点的workerID不同
  • 创建临时节点 创建零时节点,并且在节点写入ip、端口、本机时间,

    1. {"ip":"10.194.86.217","port":"8080","timestamp":1644827355855}
  • 心跳检查 为了防止leaf掉线可能导致垃圾数据,对齐心跳检查活跃状态,通过心跳检查节点时钟回拨

  • 时钟回拨 通过心跳检查时钟回拨

    1. public boolean init() {
    2. try {
    3. CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
    4. curator.start();
    5. Stat stat = curator.checkExists().forPath(PATH_FOREVER);
    6. if (stat == null) {
    7. //不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
    8. zk_AddressNode = createNode(curator);
    9. //worker id 默认是0
    10. updateLocalWorkerID(workerID);
    11. //定时上报本机时间给forever节点
    12. ScheduledUploadData(curator, zk_AddressNode);
    13. return true;
    14. } else {
    15. Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
    16. Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
    17. //存在根节点,先检查是否有属于自己的根节点
    18. List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
    19. for (String key : keys) {
    20. String[] nodeKey = key.split("-");
    21. realNode.put(nodeKey[0], key);
    22. nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
    23. }
    24. Integer workerid = nodeMap.get(listenAddress);
    25. if (workerid != null) {
    26. //有自己的节点,zk_AddressNode=ip:port
    27. zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
    28. workerID = workerid;//启动worder时使用会使用
    29. if (!checkInitTimeStamp(curator, zk_AddressNode)) {
    30. throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
    31. }
    32. //准备创建临时节点
    33. doService(curator);
    34. updateLocalWorkerID(workerID);
    35. LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);
    36. } else {
    37. //表示新启动的节点,创建持久节点 ,不用check时间
    38. String newNode = createNode(curator);
    39. zk_AddressNode = newNode;
    40. String[] nodeKey = newNode.split("-");
    41. workerID = Integer.parseInt(nodeKey[1]);
    42. doService(curator);
    43. updateLocalWorkerID(workerID);
    44. LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);
    45. }
    46. }
    47. } catch (Exception e) {
    48. LOGGER.error("Start node ERROR {}", e);
    49. try {
    50. Properties properties = new Properties();
    51. properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
    52. workerID = Integer.valueOf(properties.getProperty("workerID"));
    53. LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
    54. } catch (Exception e1) {
    55. LOGGER.error("Read file error ", e1);
    56. return false;
    57. }
    58. }
    59. return true;
    60. }

    二、leaf获取snowflakeid

    这段代码就是snowflake的核心算法,需要了解其算法看snowflake算法. ```plsql public synchronized Result get(String key) {

    1. //获取时间戳
    2. long timestamp = timeGen();
    3. //如果当前时间戳小于最后一次生成id时间戳
    4. if (timestamp < lastTimestamp) {
    5. long offset = lastTimestamp - timestamp;
    6. if (offset <= 5) {
    7. try {
    8. //线程暂停
    9. wait(offset << 1);
    10. timestamp = timeGen();
    11. //再次获取当前时间,如果还小于最后时间抛出异常
    12. if (timestamp < lastTimestamp) {
    13. return new Result(-1, Status.EXCEPTION);
    14. }
    15. } catch (InterruptedException e) {
    16. LOGGER.error("wait interrupted");
    17. return new Result(-2, Status.EXCEPTION);
    18. }
    19. } else {
    20. return new Result(-3, Status.EXCEPTION);
    21. }
    22. }
    23. //如果当前时间等于最后生成key时间戳
    24. if (lastTimestamp == timestamp) {
    25. //序列号自动加一,目的防止生成key重复
    26. sequence = (sequence + 1) & sequenceMask;
    27. //?这段代码什么目的?谁知道留言!
    28. if (sequence == 0) {
    29. //seq 为0的时候表示是下一毫秒时间开始对seq做随机
    30. sequence = RANDOM.nextInt(100);
    31. timestamp = tilNextMillis(lastTimestamp);
    32. }
    33. } else {
    34. //如果是新的ms开始
    35. sequence = RANDOM.nextInt(100);
    36. }
    37. lastTimestamp = timestamp;
    38. /*
    39. * 1、(timestamp - twepoch) 这步很重要,如果不做这步会从1970-01-01 08:00:00开始计时,预计2042年结束计时,减去START_TIME,可以从START_TIME时间点计时,让snowflake寿命更长!!!
    40. * 2、timestampLeftShift为啥是22? SnowFlake算法是计算用69年,这个时间戳移位22不超过long.max,故最大只能用22,防止越界.
    41. * “1970-01-01 08:00:00的时间戳是0,加上69年后,时间戳是2177452800000,在移位22,就到long.max边界了”
    42. * 3、 (workerId << workerIdShift) 用途是按照数据中心,集群机器id求或,目的让数据key按照数据中心和集群规则分布
    43. * 4、sequence, 用途是按照随机数求或,目的让可以最后按照最后的key规则生成
    44. */
    45. long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
    46. return new Result(id, Status.SUCCESS);

} ```

三、如何解决时钟回拨

  • 心跳解决时钟回拨 心跳发现节点时间小于zk注册时间直接报错
  • 每次生成前检查 每次生成检查lastTimestamp,当前时间不能小于lastTimestamp

    四、segment生成Id

    利用mysql主键自增id,生成一批id然后缓存起来,然后在慢慢下发.对mysql有依赖.这也是一种不错的思路.缺点,就是对mysql有依赖,mysql如果出现异常,可能有产生重复id.