开关栅栏
import java.io.Serializable;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.locks.AbstractQueuedSynchronizer;/*** 开关栅栏,名字跟{@link java.util.concurrent.CyclicBarrier} 有点相似,也是控制多个线程的,但是概念上不同,{@link SwitchBarrier} 是通过api的方式在代码层面上对任意个数线程进行控制,* 而{@link java.util.concurrent.CyclicBarrier}中线程个数是指定的,而且阻塞是自动释放。在一些场景中无法使用,我们这里借鉴栅栏(Barrier)的概念,通过开和关的方式对不定个数的多线程进行群控* {@code* SwitchBarrier switchBarrier = new SwitchBarrier();* switchBarrier.forbidCross();* ...* <p>* // 如下执行到如下的代码的线程会阻塞掉,只有其他位置执行{@code switchBarrier.allowCross()}后,阻塞在这里的线程才会再次执行* switchBarrier.cross();* }* <p>* 这里也额外提供了一个静态内置map,也可以通过静态方式的方式对某个"开关栅栏"进行开启和关闭* {@code* // 一旦执行这里,则经过SwitchBarrier.cross("xxx");的线程会被阻塞* SwitchBarrier.forbidCross("xxx");* ...* // 如果'栅栏'关闭,则会阻塞在这里,等待allowCross("xxx")执行后才能再次执行* SwitchBarrier.cross("xxx");* ...* // 执行这里,则阻塞在 SwitchBarrier.cross("xxx"); 这里的线程会重新被唤醒* SwitchBarrier.allowCross("xxx");* }* <p>* 提示:* 开启多次和开启一次效果是一样的,同样,关闭多次和关闭一次效果也是一样的,穿过一次和穿过多次效果也是一样(前提是开关没有变化)** @author shizi* @since 2020-11-23 12:01:36*/public class SwitchBarrier {private final Sync sync;/*** 开关栅栏名称map*/private static final Map<String, SwitchBarrier> SWITCH_BARRIER_MAP = new ConcurrentHashMap<>();/*** state为0认为栅栏放下(关闭),不允许通行,为1或其他则表示栅栏拉起(开启),允许通行*/private static final class Sync extends AbstractQueuedSynchronizer implements Serializable {Sync() {// 可以通行allowCross();}boolean canCross() {return 0 != getState();}void allowCross() {setState(1);}void forbidCross() {setState(0);}/*** 这里只要返回小于0 的,在share阻塞模式中就会进行阻塞** @param acquires 忽略* @return {@code -1} 在state为0时候进行阻塞使用*/@Overrideprotected int tryAcquireShared(int acquires) {return 0 == getState() ? -1 : 1;}/*** 释放** @param releases release个数* @return {@code true}*/@Overrideprotected boolean tryReleaseShared(int releases) {return true;}}public SwitchBarrier() {sync = new Sync();}/*** 开启栅栏,禁止通过* <p>* 该函数执行后,执行函数{@link SwitchBarrier#cross()}的线程就会阻塞在这里*/public void forbidCross() {sync.forbidCross();}/*** 开启某个栅栏* <p>* 该函数执行后,执行函数{@link SwitchBarrier#cross(String)}的线程就会阻塞在这里*/public static void forbidCross(String name) {SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);if (null != switchBarrierCache) {switchBarrierCache.forbidCross();} else {SwitchBarrier switchBarrier = new SwitchBarrier();switchBarrier.forbidCross();SWITCH_BARRIER_MAP.put(name, switchBarrier);}}/*** 关闭栅栏,允许通过* <p>* 该函数执行后,会通知阻阻塞在函数{@link SwitchBarrier#cross()}的线程重新执行,同时后续经过函数{@link SwitchBarrier#cross()}会直接通过*/public void allowCross() {sync.allowCross();sync.releaseShared(1);}/*** 关闭某个栅栏* <p>* 该函数执行后,会通知阻塞在函数{@link SwitchBarrier#cross(String)}这里的多个线程重新执行,同时后续经过函数{@link SwitchBarrier#cross(String)}会直接通过*/public static void allowCross(String name) {SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);if (null != switchBarrierCache) {switchBarrierCache.allowCross();} else {SwitchBarrier switchBarrier = new SwitchBarrier();switchBarrier.allowCross();SWITCH_BARRIER_MAP.put(name, switchBarrier);}}/*** 通过栅栏* <p>* 如果开启了栅栏,则经过该函数的线程阻塞,否则直接通过*/public void cross() throws InterruptedException {sync.acquireSharedInterruptibly(0);}public static void cross(String name) throws InterruptedException {SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);if (null != switchBarrierCache) {switchBarrierCache.cross();} else {SwitchBarrier switchBarrier = new SwitchBarrier();switchBarrier.cross();SWITCH_BARRIER_MAP.put(name, switchBarrier);}}/*** 栅栏是否可以通过*/public boolean canCross() {return sync.canCross();}/*** 某个栅栏是否可以通过*/public static boolean canCross(String name) {SwitchBarrier switchBarrierCache = SWITCH_BARRIER_MAP.get(name);if (null != switchBarrierCache) {return switchBarrierCache.canCross();} else {SwitchBarrier switchBarrier = new SwitchBarrier();SWITCH_BARRIER_MAP.put(name, switchBarrier);return switchBarrier.canCross();}}public static void put(String name, SwitchBarrier switchBarrier) {SWITCH_BARRIER_MAP.put(name, switchBarrier);}public static SwitchBarrier getSwitchBarrier(String name) {return SWITCH_BARRIER_MAP.get(name);}}
测试用例
public class SwitchBarrierDemo {ExecutorService executor = Executors.newCachedThreadPool();private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, r -> {Thread thread = new Thread(r, "pivot-client-watch-daemon-thread");thread.setDaemon(true);return thread;});SwitchBarrier switchBarrier = new SwitchBarrier();Integer count = 7;Integer available = 0;@SneakyThrows@Testpublic void test1() {scheduler.scheduleWithFixedDelay(() -> {if (count <= 0) {// 在3次之后就不可用了switchBarrier.forbidCross();show("服务不可用了 ************** ");available++;if(available == 10) {count = 7;available = 0;}} else {show("服务可用 ------------- ");switchBarrier.allowCross();count--;}}, 1, 1, TimeUnit.SECONDS);for (Integer index = 0; index < 3; index++) {Integer finalIndex = index;executor.submit(() -> {while (true) {switchBarrier.cross();show("请求服务:" + finalIndex);// 监听应用端口,对应的应用Thread.sleep(2 * 1000);show("收到服务返回:" + finalIndex);}});}Thread.sleep(1000 * 1000);}}
输出打印
请求服务:0请求服务:1请求服务:2服务可用 -------------收到服务返回:1收到服务返回:2请求服务:2收到服务返回:0请求服务:1请求服务:0服务可用 -------------服务可用 -------------收到服务返回:2请求服务:2收到服务返回:1请求服务:1收到服务返回:0请求服务:0服务可用 -------------服务可用 -------------收到服务返回:0请求服务:0收到服务返回:1请求服务:1收到服务返回:2请求服务:2服务可用 -------------服务可用 -------------收到服务返回:1收到服务返回:0收到服务返回:2请求服务:0请求服务:1请求服务:2服务不可用了 **************服务不可用了 **************收到服务返回:2收到服务返回:1收到服务返回:0服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务可用 -------------请求服务:2请求服务:1请求服务:0服务可用 -------------收到服务返回:0请求服务:0收到服务返回:1请求服务:1收到服务返回:2请求服务:2服务可用 -------------服务可用 -------------收到服务返回:0请求服务:0收到服务返回:2请求服务:2收到服务返回:1请求服务:1服务可用 -------------服务可用 -------------收到服务返回:0请求服务:0收到服务返回:2请求服务:2收到服务返回:1请求服务:1服务可用 -------------服务不可用了 **************收到服务返回:2收到服务返回:0收到服务返回:1服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务不可用了 **************服务可用 -------------请求服务:2请求服务:1请求服务:0服务可用 -------------收到服务返回:1收到服务返回:0请求服务:0收到服务返回:2请求服务:2请求服务:1服务可用 -------------服务可用 -------------
