1. @Slf4j
    2. public class BufferManager
    3. {
    4. /**
    5. * 业务命名空间
    6. */
    7. private String namespace;
    8. private BitSequenceConfig buffer1 = new BitSequenceConfig();
    9. private BitSequenceConfig buffer2 = new BitSequenceConfig();
    10. private BitSequenceConfig currentBuffer;
    11. private PaddedLong currentSequence;
    12. /**
    13. * 刷新的状态
    14. */
    15. private volatile RefreshEnum refreshState = RefreshEnum.NON;
    16. /**
    17. * 这里暂时设置刷新比率为0.4
    18. */
    19. private float refreshRatio = 0.4f;
    20. private int refreshValue = (int) (SEQ_MAX_SIZE * refreshRatio);
    21. public BufferManager(BitSequenceDTO bitSequenceDTO)
    22. {
    23. if (null == bitSequenceDTO)
    24. {
    25. return;
    26. }
    27. this.namespace = bitSequenceDTO.getNamespace();
    28. buffer1.update(bitSequenceDTO);
    29. this.currentBuffer = buffer1;
    30. this.currentSequence = new PaddedLong(0);
    31. }
    32. public int getRsv()
    33. {
    34. return currentBuffer.getRsv();
    35. }
    36. public long getTime()
    37. {
    38. return currentBuffer.getTime();
    39. }
    40. /**
    41. * 双buffer方式获取新的序列值
    42. *
    43. * @return 新的序列
    44. */
    45. public long getSequence()
    46. {
    47. long sequence;
    48. while (true)
    49. {
    50. sequence = currentSequence.get();
    51. // 二级buffer刷新处理
    52. if (reachRefreshBuffer(sequence))
    53. {
    54. if (refreshState == RefreshEnum.READY_ASYNC)
    55. {
    56. continue;
    57. }
    58. synchronized (this)
    59. {
    60. if (refreshState == RefreshEnum.READY_ASYNC)
    61. {
    62. continue;
    63. }
    64. refreshState = RefreshEnum.READY_ASYNC;
    65. }
    66. // 刷新二级buffer
    67. refreshBuffer();
    68. }
    69. // 到达最后
    70. if (sequence >= SEQ_MAX_SIZE)
    71. {
    72. if (refreshState != RefreshEnum.READY_RPC && refreshState != RefreshEnum.FINISH)
    73. {
    74. throw new SnowflakeException("server端获取新的buffer失败");
    75. }
    76. switchBuffer();
    77. }
    78. return currentSequence.getAndIncrement();
    79. }
    80. }
    81. public int getWorkId()
    82. {
    83. return currentBuffer.getWorkId();
    84. }
    85. /**
    86. * 到达刷新二级buffer的点
    87. */
    88. private boolean reachRefreshBuffer(long sequence)
    89. {
    90. return (sequence >= refreshValue && refreshState != RefreshEnum.READY_RPC && refreshState != RefreshEnum.FINISH);
    91. }
    92. private void refreshBuffer()
    93. {
    94. CompletableFuture.runAsync(() ->
    95. {
    96. try
    97. {
    98. synchronized (this)
    99. {
    100. refreshState = RefreshEnum.READY_RPC;
    101. updateBuffer(SnowflakeSeqGeneratorFactory.getInstance().getSequenceApi().getNext(namespace));
    102. refreshState = RefreshEnum.FINISH;
    103. }
    104. }
    105. catch (Throwable e)
    106. {
    107. log.error("rpc 调用异常", e);
    108. }
    109. });
    110. }
    111. /**
    112. * 到达最后则进行buffer切换
    113. */
    114. private synchronized void switchBuffer()
    115. {
    116. if (refreshState == RefreshEnum.READY_RPC)
    117. {
    118. throw new SnowflakeException("server端获取新的buffer失败");
    119. }
    120. // 表示切换完成
    121. if (refreshState == RefreshEnum.NON)
    122. {
    123. return;
    124. }
    125. if (currentBuffer == buffer1)
    126. {
    127. currentBuffer = buffer2;
    128. }
    129. else if (currentBuffer == buffer2)
    130. {
    131. currentBuffer = buffer1;
    132. }
    133. currentSequence.set(0);
    134. refreshState = RefreshEnum.NON;
    135. }
    136. private void updateBuffer(PlainResult<BitSequenceDTO> sequenceDTO)
    137. {
    138. if (!sequenceDTO.isOk())
    139. {
    140. log.error("server返回信息异常:rsp={}", sequenceDTO);
    141. throw new SnowflakeException("server返回信息异常");
    142. }
    143. BitSequenceDTO bitSequence = sequenceDTO.getData();
    144. if (currentBuffer == buffer1)
    145. {
    146. buffer2.update(bitSequence);
    147. }
    148. else if (currentBuffer == buffer2)
    149. {
    150. buffer1.update(bitSequence);
    151. }
    152. }
    153. /**
    154. * 二级buffer的刷新状态
    155. */
    156. enum RefreshEnum
    157. {
    158. /**
    159. * 无状态
    160. */
    161. NON,
    162. /**
    163. * 准备创建异步
    164. */
    165. READY_ASYNC,
    166. /**
    167. * 准备rpc获取远端数据
    168. */
    169. READY_RPC,
    170. /**
    171. * 完成
    172. */
    173. FINISH
    174. }
    175. }