@Slf4jpublic class BufferManager{ /** * 业务命名空间 */ private String namespace; private BitSequenceConfig buffer1 = new BitSequenceConfig(); private BitSequenceConfig buffer2 = new BitSequenceConfig(); private BitSequenceConfig currentBuffer; private PaddedLong currentSequence; /** * 刷新的状态 */ private volatile RefreshEnum refreshState = RefreshEnum.NON; /** * 这里暂时设置刷新比率为0.4 */ private float refreshRatio = 0.4f; private int refreshValue = (int) (SEQ_MAX_SIZE * refreshRatio); public BufferManager(BitSequenceDTO bitSequenceDTO) { if (null == bitSequenceDTO) { return; } this.namespace = bitSequenceDTO.getNamespace(); buffer1.update(bitSequenceDTO); this.currentBuffer = buffer1; this.currentSequence = new PaddedLong(0); } public int getRsv() { return currentBuffer.getRsv(); } public long getTime() { return currentBuffer.getTime(); } /** * 双buffer方式获取新的序列值 * * @return 新的序列 */ public long getSequence() { long sequence; while (true) { sequence = currentSequence.get(); // 二级buffer刷新处理 if (reachRefreshBuffer(sequence)) { if (refreshState == RefreshEnum.READY_ASYNC) { continue; } synchronized (this) { if (refreshState == RefreshEnum.READY_ASYNC) { continue; } refreshState = RefreshEnum.READY_ASYNC; } // 刷新二级buffer refreshBuffer(); } // 到达最后 if (sequence >= SEQ_MAX_SIZE) { if (refreshState != RefreshEnum.READY_RPC && refreshState != RefreshEnum.FINISH) { throw new SnowflakeException("server端获取新的buffer失败"); } switchBuffer(); } return currentSequence.getAndIncrement(); } } public int getWorkId() { return currentBuffer.getWorkId(); } /** * 到达刷新二级buffer的点 */ private boolean reachRefreshBuffer(long sequence) { return (sequence >= refreshValue && refreshState != RefreshEnum.READY_RPC && refreshState != RefreshEnum.FINISH); } private void refreshBuffer() { CompletableFuture.runAsync(() -> { try { synchronized (this) { refreshState = RefreshEnum.READY_RPC; updateBuffer(SnowflakeSeqGeneratorFactory.getInstance().getSequenceApi().getNext(namespace)); refreshState = RefreshEnum.FINISH; } } catch (Throwable e) { log.error("rpc 调用异常", e); } }); } /** * 到达最后则进行buffer切换 */ private synchronized void switchBuffer() { if (refreshState == RefreshEnum.READY_RPC) { throw new SnowflakeException("server端获取新的buffer失败"); } // 表示切换完成 if (refreshState == RefreshEnum.NON) { return; } if (currentBuffer == buffer1) { currentBuffer = buffer2; } else if (currentBuffer == buffer2) { currentBuffer = buffer1; } currentSequence.set(0); refreshState = RefreshEnum.NON; } private void updateBuffer(PlainResult<BitSequenceDTO> sequenceDTO) { if (!sequenceDTO.isOk()) { log.error("server返回信息异常:rsp={}", sequenceDTO); throw new SnowflakeException("server返回信息异常"); } BitSequenceDTO bitSequence = sequenceDTO.getData(); if (currentBuffer == buffer1) { buffer2.update(bitSequence); } else if (currentBuffer == buffer2) { buffer1.update(bitSequence); } } /** * 二级buffer的刷新状态 */ enum RefreshEnum { /** * 无状态 */ NON, /** * 准备创建异步 */ READY_ASYNC, /** * 准备rpc获取远端数据 */ READY_RPC, /** * 完成 */ FINISH }}