一、leaf初始化
- 生成workerID 利用zk顺序节点,不同集群节点的workerID不同
创建临时节点 创建零时节点,并且在节点写入ip、端口、本机时间,
{"ip":"10.194.86.217","port":"8080","timestamp":1644827355855}
心跳检查 为了防止leaf掉线可能导致垃圾数据,对齐心跳检查活跃状态,通过心跳检查节点时钟回拨
时钟回拨 通过心跳检查时钟回拨
public boolean init() {try {CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);curator.start();Stat stat = curator.checkExists().forPath(PATH_FOREVER);if (stat == null) {//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据zk_AddressNode = createNode(curator);//worker id 默认是0updateLocalWorkerID(workerID);//定时上报本机时间给forever节点ScheduledUploadData(curator, zk_AddressNode);return true;} else {Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)//存在根节点,先检查是否有属于自己的根节点List<String> keys = curator.getChildren().forPath(PATH_FOREVER);for (String key : keys) {String[] nodeKey = key.split("-");realNode.put(nodeKey[0], key);nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));}Integer workerid = nodeMap.get(listenAddress);if (workerid != null) {//有自己的节点,zk_AddressNode=ip:portzk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);workerID = workerid;//启动worder时使用会使用if (!checkInitTimeStamp(curator, zk_AddressNode)) {throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");}//准备创建临时节点doService(curator);updateLocalWorkerID(workerID);LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);} else {//表示新启动的节点,创建持久节点 ,不用check时间String newNode = createNode(curator);zk_AddressNode = newNode;String[] nodeKey = newNode.split("-");workerID = Integer.parseInt(nodeKey[1]);doService(curator);updateLocalWorkerID(workerID);LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);}}} catch (Exception e) {LOGGER.error("Start node ERROR {}", e);try {Properties properties = new Properties();properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));workerID = Integer.valueOf(properties.getProperty("workerID"));LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);} catch (Exception e1) {LOGGER.error("Read file error ", e1);return false;}}return true;}
二、leaf获取snowflakeid
这段代码就是snowflake的核心算法,需要了解其算法看snowflake算法. ```plsql public synchronized Result get(String key) {
//获取时间戳long timestamp = timeGen();//如果当前时间戳小于最后一次生成id时间戳if (timestamp < lastTimestamp) {long offset = lastTimestamp - timestamp;if (offset <= 5) {try {//线程暂停wait(offset << 1);timestamp = timeGen();//再次获取当前时间,如果还小于最后时间抛出异常if (timestamp < lastTimestamp) {return new Result(-1, Status.EXCEPTION);}} catch (InterruptedException e) {LOGGER.error("wait interrupted");return new Result(-2, Status.EXCEPTION);}} else {return new Result(-3, Status.EXCEPTION);}}//如果当前时间等于最后生成key时间戳if (lastTimestamp == timestamp) {//序列号自动加一,目的防止生成key重复sequence = (sequence + 1) & sequenceMask;//?这段代码什么目的?谁知道留言!if (sequence == 0) {//seq 为0的时候表示是下一毫秒时间开始对seq做随机sequence = RANDOM.nextInt(100);timestamp = tilNextMillis(lastTimestamp);}} else {//如果是新的ms开始sequence = RANDOM.nextInt(100);}lastTimestamp = timestamp;/** 1、(timestamp - twepoch) 这步很重要,如果不做这步会从1970-01-01 08:00:00开始计时,预计2042年结束计时,减去START_TIME,可以从START_TIME时间点计时,让snowflake寿命更长!!!* 2、timestampLeftShift为啥是22? SnowFlake算法是计算用69年,这个时间戳移位22不超过long.max,故最大只能用22,防止越界.* “1970-01-01 08:00:00的时间戳是0,加上69年后,时间戳是2177452800000,在移位22,就到long.max边界了”* 3、 (workerId << workerIdShift) 用途是按照数据中心,集群机器id求或,目的让数据key按照数据中心和集群规则分布* 4、sequence, 用途是按照随机数求或,目的让可以最后按照最后的key规则生成*/long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;return new Result(id, Status.SUCCESS);
