开关栅栏

    1. import java.io.Serializable;
    2. import java.util.Map;
    3. import java.util.concurrent.ConcurrentHashMap;
    4. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    5. /**
    6. * 开关栅栏,名字跟{@link java.util.concurrent.CyclicBarrier} 有点相似,也是控制多个线程的,但是概念上不同,{@link SwitchBarrier} 是通过api的方式在代码层面上对任意个数线程进行控制,
    7. * 而{@link java.util.concurrent.CyclicBarrier}中线程个数是指定的,而且阻塞是自动释放。在一些场景中无法使用,我们这里借鉴栅栏(Barrier)的概念,通过开和关的方式对不定个数的多线程进行群控
    8. * {@code
    9. * SwitchBarrier switchBarrier = new SwitchBarrier();
    10. * switchBarrier.forbidCross();
    11. * ...
    12. * <p>
    13. * // 如下执行到如下的代码的线程会阻塞掉,只有其他位置执行{@code switchBarrier.allowCross()}后,阻塞在这里的线程才会再次执行
    14. * switchBarrier.cross();
    15. * }
    16. * <p>
    17. * 这里也额外提供了一个静态内置map,也可以通过静态方式的方式对某个"开关栅栏"进行开启和关闭
    18. * {@code
    19. * // 一旦执行这里,则经过SwitchBarrier.cross("xxx");的线程会被阻塞
    20. * SwitchBarrier.forbidCross("xxx");
    21. * ...
    22. * // 如果'栅栏'关闭,则会阻塞在这里,等待allowCross("xxx")执行后才能再次执行
    23. * SwitchBarrier.cross("xxx");
    24. * ...
    25. * // 执行这里,则阻塞在 SwitchBarrier.cross("xxx"); 这里的线程会重新被唤醒
    26. * SwitchBarrier.allowCross("xxx");
    27. * }
    28. * <p>
    29. * 提示:
    30. * 开启多次和开启一次效果是一样的,同样,关闭多次和关闭一次效果也是一样的,穿过一次和穿过多次效果也是一样(前提是开关没有变化)
    31. *
    32. * @author shizi
    33. * @since 2020-11-23 12:01:36
    34. */
    35. public class SwitchBarrier {
    36. private final Sync sync;
    37. /**
    38. * 开关栅栏名称map
    39. */
    40. private static final Map<String, SwitchBarrier> SWITCH_BARRIER_MAP = new ConcurrentHashMap<>();
    41. /**
    42. * state为0认为栅栏放下(关闭),不允许通行,为1或其他则表示栅栏拉起(开启),允许通行
    43. */
    44. private static final class Sync extends AbstractQueuedSynchronizer implements Serializable {
    45. Sync() {
    46. // 可以通行
    47. allowCross();
    48. }
    49. boolean canCross() {
    50. return 0 != getState();
    51. }
    52. void allowCross() {
    53. setState(1);
    54. }
    55. void forbidCross() {
    56. setState(0);
    57. }
    58. /**
    59. * 这里只要返回小于0 的,在share阻塞模式中就会进行阻塞
    60. *
    61. * @param acquires 忽略
    62. * @return {@code -1} 在state为0时候进行阻塞使用
    63. */
    64. @Override
    65. protected int tryAcquireShared(int acquires) {
    66. return 0 == getState() ? -1 : 1;
    67. }
    68. /**
    69. * 释放
    70. *
    71. * @param releases release个数
    72. * @return {@code true}
    73. */
    74. @Override
    75. protected boolean tryReleaseShared(int releases) {
    76. return true;
    77. }
    78. }
    79. public SwitchBarrier() {
    80. sync = new Sync();
    81. }
    82. /**
    83. * 开启栅栏,禁止通过
    84. * <p>
    85. * 该函数执行后,执行函数{@link SwitchBarrier#cross()}的线程就会阻塞在这里
    86. */
    87. public void forbidCross() {
    88. sync.forbidCross();
    89. }
    90. /**
    91. * 开启某个栅栏
    92. * <p>
    93. * 该函数执行后,执行函数{@link SwitchBarrier#cross(String)}的线程就会阻塞在这里
    94. */
    95. public static void forbidCross(String name) {
    96. SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);
    97. if (null != switchBarrierCache) {
    98. switchBarrierCache.forbidCross();
    99. } else {
    100. SwitchBarrier switchBarrier = new SwitchBarrier();
    101. switchBarrier.forbidCross();
    102. SWITCH_BARRIER_MAP.put(name, switchBarrier);
    103. }
    104. }
    105. /**
    106. * 关闭栅栏,允许通过
    107. * <p>
    108. * 该函数执行后,会通知阻阻塞在函数{@link SwitchBarrier#cross()}的线程重新执行,同时后续经过函数{@link SwitchBarrier#cross()}会直接通过
    109. */
    110. public void allowCross() {
    111. sync.allowCross();
    112. sync.releaseShared(1);
    113. }
    114. /**
    115. * 关闭某个栅栏
    116. * <p>
    117. * 该函数执行后,会通知阻塞在函数{@link SwitchBarrier#cross(String)}这里的多个线程重新执行,同时后续经过函数{@link SwitchBarrier#cross(String)}会直接通过
    118. */
    119. public static void allowCross(String name) {
    120. SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);
    121. if (null != switchBarrierCache) {
    122. switchBarrierCache.allowCross();
    123. } else {
    124. SwitchBarrier switchBarrier = new SwitchBarrier();
    125. switchBarrier.allowCross();
    126. SWITCH_BARRIER_MAP.put(name, switchBarrier);
    127. }
    128. }
    129. /**
    130. * 通过栅栏
    131. * <p>
    132. * 如果开启了栅栏,则经过该函数的线程阻塞,否则直接通过
    133. */
    134. public void cross() throws InterruptedException {
    135. sync.acquireSharedInterruptibly(0);
    136. }
    137. public static void cross(String name) throws InterruptedException {
    138. SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);
    139. if (null != switchBarrierCache) {
    140. switchBarrierCache.cross();
    141. } else {
    142. SwitchBarrier switchBarrier = new SwitchBarrier();
    143. switchBarrier.cross();
    144. SWITCH_BARRIER_MAP.put(name, switchBarrier);
    145. }
    146. }
    147. /**
    148. * 栅栏是否可以通过
    149. */
    150. public boolean canCross() {
    151. return sync.canCross();
    152. }
    153. /**
    154. * 某个栅栏是否可以通过
    155. */
    156. public static boolean canCross(String name) {
    157. SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);
    158. if (null != switchBarrierCache) {
    159. return switchBarrierCache.canCross();
    160. } else {
    161. SwitchBarrier switchBarrier = new SwitchBarrier();
    162. SWITCH_BARRIER_MAP.put(name, switchBarrier);
    163. return switchBarrier.canCross();
    164. }
    165. }
    166. public static void put(String name, SwitchBarrier switchBarrier) {
    167. SWITCH_BARRIER_MAP.put(name, switchBarrier);
    168. }
    169. public static SwitchBarrier getSwitchBarrier(String name) {
    170. return SWITCH_BARRIER_MAP.get(name);
    171. }
    172. }

    测试用例

    1. public class SwitchBarrierDemo {
    2. ExecutorService executor = Executors.newCachedThreadPool();
    3. private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, r -> {
    4. Thread thread = new Thread(r, "pivot-client-watch-daemon-thread");
    5. thread.setDaemon(true);
    6. return thread;
    7. });
    8. SwitchBarrier switchBarrier = new SwitchBarrier();
    9. Integer count = 7;
    10. Integer available = 0;
    11. @SneakyThrows
    12. @Test
    13. public void test1() {
    14. scheduler.scheduleWithFixedDelay(() -> {
    15. if (count <= 0) {
    16. // 在3次之后就不可用了
    17. switchBarrier.forbidCross();
    18. show("服务不可用了 ************** ");
    19. available++;
    20. if(available == 10) {
    21. count = 7;
    22. available = 0;
    23. }
    24. } else {
    25. show("服务可用 ------------- ");
    26. switchBarrier.allowCross();
    27. count--;
    28. }
    29. }, 1, 1, TimeUnit.SECONDS);
    30. for (Integer index = 0; index < 3; index++) {
    31. Integer finalIndex = index;
    32. executor.submit(() -> {
    33. while (true) {
    34. switchBarrier.cross();
    35. show("请求服务:" + finalIndex);
    36. // 监听应用端口,对应的应用
    37. Thread.sleep(2 * 1000);
    38. show("收到服务返回:" + finalIndex);
    39. }
    40. });
    41. }
    42. Thread.sleep(1000 * 1000);
    43. }
    44. }

    输出打印

    1. 请求服务:0
    2. 请求服务:1
    3. 请求服务:2
    4. 服务可用 -------------
    5. 收到服务返回:1
    6. 收到服务返回:2
    7. 请求服务:2
    8. 收到服务返回:0
    9. 请求服务:1
    10. 请求服务:0
    11. 服务可用 -------------
    12. 服务可用 -------------
    13. 收到服务返回:2
    14. 请求服务:2
    15. 收到服务返回:1
    16. 请求服务:1
    17. 收到服务返回:0
    18. 请求服务:0
    19. 服务可用 -------------
    20. 服务可用 -------------
    21. 收到服务返回:0
    22. 请求服务:0
    23. 收到服务返回:1
    24. 请求服务:1
    25. 收到服务返回:2
    26. 请求服务:2
    27. 服务可用 -------------
    28. 服务可用 -------------
    29. 收到服务返回:1
    30. 收到服务返回:0
    31. 收到服务返回:2
    32. 请求服务:0
    33. 请求服务:1
    34. 请求服务:2
    35. 服务不可用了 **************
    36. 服务不可用了 **************
    37. 收到服务返回:2
    38. 收到服务返回:1
    39. 收到服务返回:0
    40. 服务不可用了 **************
    41. 服务不可用了 **************
    42. 服务不可用了 **************
    43. 服务不可用了 **************
    44. 服务不可用了 **************
    45. 服务不可用了 **************
    46. 服务不可用了 **************
    47. 服务不可用了 **************
    48. 服务可用 -------------
    49. 请求服务:2
    50. 请求服务:1
    51. 请求服务:0
    52. 服务可用 -------------
    53. 收到服务返回:0
    54. 请求服务:0
    55. 收到服务返回:1
    56. 请求服务:1
    57. 收到服务返回:2
    58. 请求服务:2
    59. 服务可用 -------------
    60. 服务可用 -------------
    61. 收到服务返回:0
    62. 请求服务:0
    63. 收到服务返回:2
    64. 请求服务:2
    65. 收到服务返回:1
    66. 请求服务:1
    67. 服务可用 -------------
    68. 服务可用 -------------
    69. 收到服务返回:0
    70. 请求服务:0
    71. 收到服务返回:2
    72. 请求服务:2
    73. 收到服务返回:1
    74. 请求服务:1
    75. 服务可用 -------------
    76. 服务不可用了 **************
    77. 收到服务返回:2
    78. 收到服务返回:0
    79. 收到服务返回:1
    80. 服务不可用了 **************
    81. 服务不可用了 **************
    82. 服务不可用了 **************
    83. 服务不可用了 **************
    84. 服务不可用了 **************
    85. 服务不可用了 **************
    86. 服务不可用了 **************
    87. 服务不可用了 **************
    88. 服务不可用了 **************
    89. 服务可用 -------------
    90. 请求服务:2
    91. 请求服务:1
    92. 请求服务:0
    93. 服务可用 -------------
    94. 收到服务返回:1
    95. 收到服务返回:0
    96. 请求服务:0
    97. 收到服务返回:2
    98. 请求服务:2
    99. 请求服务:1
    100. 服务可用 -------------
    101. 服务可用 -------------