






  1. /**
  2. * Constructs a {@code CountDownLatch} initialized with the given count.
  3. *
  4. * @param count the number of times {@link #countDown} must be invoked
  5. * before threads can pass through {@link #await}
  6. * @throws IllegalArgumentException if {@code count} is negative
  7. */
  8. public CountDownLatch(int count) {
  9. if (count < 0) throw new IllegalArgumentException("count < 0");
  10. this.sync = new Sync(count);
  11. }


  1. import java.util.concurrent.TimeUnit;
  2. /**
  3. * 类说明:线程休眠辅助工具类
  4. */
  5. public class SleepTools {
  6. /**
  7. * 按秒休眠
  8. * @param seconds 秒数
  9. */
  10. public static final void second(int seconds) {
  11. try {
  12. TimeUnit.SECONDS.sleep(seconds);
  13. } catch (InterruptedException e) {
  14. }
  15. }
  16. /**
  17. * 按毫秒数休眠
  18. * @param seconds 毫秒数
  19. */
  20. public static final void ms(int seconds) {
  21. try {
  22. TimeUnit.MILLISECONDS.sleep(seconds);
  23. } catch (InterruptedException e) {
  24. }
  25. }
  26. }
  1. import;
  2. import java.util.concurrent.CountDownLatch;
  3. /**
  4. * CountDownLatch的使用,有五个线程,6个扣除点
  5. * 扣除完成后主线程和业务线程,才能执行工作
  6. * 扣除点一般都是大于等于需要初始化的线程的
  7. */
  8. public class UseCountDownLatch {
  9. /**
  10. * 设置为6个扣除点
  11. */
  12. static CountDownLatch countDownLatch = new CountDownLatch(6);
  13. /**
  14. * 初始化线程
  15. */
  16. private static class InitThread implements Runnable {
  17. @Override
  18. public void run() {
  19. System.out.println("thread_" + Thread.currentThread().getId() + " ready init work .....");
  20. // 执行扣减 扣减不代表结束
  21. countDownLatch.countDown();
  22. for (int i = 0; i < 2; i++) {
  23. System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");
  24. }
  25. }
  26. }
  27. /**
  28. * 业务线程
  29. */
  30. private static class BusiThread implements Runnable {
  31. @Override
  32. public void run() {
  33. // 业务线程需要在等初始化完毕后才能执行
  34. try {
  35. countDownLatch.await();
  36. for (int i = 0; i < 3; i++) {
  37. System.out.println("BusiThread " + Thread.currentThread().getId() + " do business-----");
  38. }
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. public static void main(String[] args) {
  45. // 创建单独的初始化线程
  46. new Thread(){
  47. @Override
  48. public void run() {
  50. System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 1st.....");
  51. // 扣减一次
  52. countDownLatch.countDown();
  53. System.out.println("begin stop 2nd.....");
  55. System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 2nd.....");
  56. // 扣减一次
  57. countDownLatch.countDown();
  58. }
  59. }.start();
  60. // 启动业务线程
  61. new Thread(new BusiThread()).start();
  62. // 启动初始化线程
  63. for (int i = 0; i <= 3; i++) {
  64. new Thread(new InitThread()).start();
  65. }
  66. // 主线程进入等待
  67. try {
  68. countDownLatch.await();
  69. System.out.println("Main do ites work.....");
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. }


  1. thread_13 ready init work .....
  2. thread_13.....continue do its work
  3. thread_13.....continue do its work
  4. thread_14 ready init work .....
  5. thread_14.....continue do its work
  6. thread_14.....continue do its work
  7. thread_15 ready init work .....
  8. thread_15.....continue do its work
  9. thread_11 ready init work step 1st.....
  10. begin stop 2nd.....
  11. thread_16 ready init work .....
  12. thread_16.....continue do its work
  13. thread_16.....continue do its work
  14. thread_15.....continue do its work
  15. thread_11 ready init work step 2nd.....
  16. Main do ites work.....
  17. BusiThread 12 do business-----
  18. BusiThread 12 do business-----
  19. BusiThread 12 do business-----




  1. /**
  2. * Creates a new {@code CyclicBarrier} that will trip when the
  3. * given number of parties (threads) are waiting upon it, and
  4. * does not perform a predefined action when the barrier is tripped.
  5. *
  6. * @param parties the number of threads that must invoke {@link #await}
  7. * before the barrier is tripped
  8. * @throws IllegalArgumentException if {@code parties} is less than 1
  9. */
  10. public CyclicBarrier(int parties) {
  11. this(parties, null);
  12. }
  13. /**
  14. * Creates a new {@code CyclicBarrier} that will trip when the
  15. * given number of parties (threads) are waiting upon it, and which
  16. * will execute the given barrier action when the barrier is tripped,
  17. * performed by the last thread entering the barrier.
  18. *
  19. * @param parties the number of threads that must invoke {@link #await}
  20. * before the barrier is tripped
  21. * @param barrierAction the command to execute when the barrier is
  22. * tripped, or {@code null} if there is no action
  23. * @throws IllegalArgumentException if {@code parties} is less than 1
  24. */
  25. public CyclicBarrier(int parties, Runnable barrierAction) {
  26. if (parties <= 0) throw new IllegalArgumentException();
  27. this.parties = parties;
  28. this.count = parties;
  29. this.barrierCommand = barrierAction;
  30. }


  1. import;
  2. import java.util.Map;
  3. import java.util.Random;
  4. import java.util.concurrent.BrokenBarrierException;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.CyclicBarrier;
  7. /**
  8. * CyclicBarrier的使用
  9. */
  10. public class UseCyclicBarrier {
  11. /**
  12. * 存放子线程工作结果的安全容器
  13. */
  14. private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
  15. private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());
  16. /**
  17. * 结果打印线程
  18. * 用来演示CyclicBarrier的第二个参数,barrierAction
  19. */
  20. private static class CollectThread implements Runnable {
  21. @Override
  22. public void run() {
  23. StringBuffer result = new StringBuffer();
  24. for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
  25. result.append("[" + workResult.getValue() + "]");
  26. }
  27. System.out.println("the result = " + result);
  28. System.out.println("do other business.....");
  29. }
  30. }
  31. /**
  32. * 工作子线程
  33. * 用于CyclicBarrier的一组线程
  34. */
  35. private static class SubThread implements Runnable {
  36. @Override
  37. public void run() {
  38. // 获取当前线程的ID
  39. long id = Thread.currentThread().getId();
  40. // 放入统计容器中
  41. resultMap.put(String.valueOf(id), id);
  42. Random random = new Random();
  43. try {
  44. if (random.nextBoolean()) {
  45. Thread.sleep(1000 + id);
  46. System.out.println("Thread_"+id+"..... do something");
  47. }
  48. System.out.println(id+" is await");
  49. cyclicBarrier.await();
  50. Thread.sleep(1000+id);
  51. System.out.println("Thread_"+id+" its business");
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. } catch (BrokenBarrierException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. public static void main(String[] args) {
  60. for (int i = 0; i <= 4; i++) {
  61. Thread thread = new Thread(new SubThread());
  62. thread.start();
  63. }
  64. }
  65. }


  1. 11 is await
  2. 14 is await
  3. 15 is await
  4. Thread_12..... do something
  5. 12 is await
  6. Thread_13..... do something
  7. 13 is await
  8. the result = [11][12][13][14][15]
  9. do other business.....
  10. its business
  11. its business
  12. its business
  13. its business
  14. its business

通过返回结果可以看出前面的11 14 15三个线程没有进入if语句块,在执行到await的时候进入了等待,而另外12 13两个线程进入到了if语句块当中,多休眠了1秒多,然后当5个线程同时到达await的时候,屏障开放,执行了barrierAction线程,然后线程组继续执行

CountDownLatch CyclicBarrier
控制 第三方控制 自身控制
传入数量 大于等于线程数量 等于线程数量



  1. /**
  2. * Creates a {@code Semaphore} with the given number of
  3. * permits and nonfair fairness setting.
  4. *
  5. * @param permits the initial number of permits available.
  6. * This value may be negative, in which case releases
  7. * must occur before any acquires will be granted.
  8. */
  9. public Semaphore(int permits) {
  10. sync = new NonfairSync(permits);
  11. }


  1. import;
  2. import java.sql.*;
  3. import java.util.Map;
  4. import java.util.Properties;
  5. import java.util.concurrent.Executor;
  6. /**
  7. * 数据库连接
  8. */
  9. public class SqlConnection implements Connection {
  10. /**
  11. * 获取数据库连接
  12. * @return
  13. */
  14. public static final Connection fetchConnection(){
  15. return new SqlConnection();
  16. }
  17. @Override
  18. public void commit() throws SQLException {
  20. }
  21. @Override
  22. public Statement createStatement() throws SQLException {
  24. return null;
  25. }
  26. @Override
  27. public PreparedStatement prepareStatement(String sql) throws SQLException {
  28. return null;
  29. }
  30. @Override
  31. public CallableStatement prepareCall(String sql) throws SQLException {
  32. return null;
  33. }
  34. @Override
  35. public String nativeSQL(String sql) throws SQLException {
  36. return null;
  37. }
  38. @Override
  39. public void setAutoCommit(boolean autoCommit) throws SQLException {
  40. }
  41. @Override
  42. public boolean getAutoCommit() throws SQLException {
  43. return false;
  44. }
  45. @Override
  46. public void rollback() throws SQLException {
  47. }
  48. @Override
  49. public void close() throws SQLException {
  50. }
  51. @Override
  52. public boolean isClosed() throws SQLException {
  53. return false;
  54. }
  55. @Override
  56. public DatabaseMetaData getMetaData() throws SQLException {
  57. return null;
  58. }
  59. @Override
  60. public void setReadOnly(boolean readOnly) throws SQLException {
  61. }
  62. @Override
  63. public boolean isReadOnly() throws SQLException {
  64. return false;
  65. }
  66. @Override
  67. public void setCatalog(String catalog) throws SQLException {
  68. }
  69. @Override
  70. public String getCatalog() throws SQLException {
  71. return null;
  72. }
  73. @Override
  74. public void setTransactionIsolation(int level) throws SQLException {
  75. }
  76. @Override
  77. public int getTransactionIsolation() throws SQLException {
  78. return 0;
  79. }
  80. @Override
  81. public SQLWarning getWarnings() throws SQLException {
  82. return null;
  83. }
  84. @Override
  85. public void clearWarnings() throws SQLException {
  86. }
  87. @Override
  88. public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
  89. return null;
  90. }
  91. @Override
  92. public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
  93. return null;
  94. }
  95. @Override
  96. public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
  97. return null;
  98. }
  99. @Override
  100. public Map<String, Class<?>> getTypeMap() throws SQLException {
  101. return null;
  102. }
  103. @Override
  104. public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
  105. }
  106. @Override
  107. public void setHoldability(int holdability) throws SQLException {
  108. }
  109. @Override
  110. public int getHoldability() throws SQLException {
  111. return 0;
  112. }
  113. @Override
  114. public Savepoint setSavepoint() throws SQLException {
  115. return null;
  116. }
  117. @Override
  118. public Savepoint setSavepoint(String name) throws SQLException {
  119. return null;
  120. }
  121. @Override
  122. public void rollback(Savepoint savepoint) throws SQLException {
  123. }
  124. @Override
  125. public void releaseSavepoint(Savepoint savepoint) throws SQLException {
  126. }
  127. @Override
  128. public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
  129. return null;
  130. }
  131. @Override
  132. public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
  133. return null;
  134. }
  135. @Override
  136. public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
  137. return null;
  138. }
  139. @Override
  140. public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
  141. return null;
  142. }
  143. @Override
  144. public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
  145. return null;
  146. }
  147. @Override
  148. public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
  149. return null;
  150. }
  151. @Override
  152. public Clob createClob() throws SQLException {
  153. return null;
  154. }
  155. @Override
  156. public Blob createBlob() throws SQLException {
  157. return null;
  158. }
  159. @Override
  160. public NClob createNClob() throws SQLException {
  161. return null;
  162. }
  163. @Override
  164. public SQLXML createSQLXML() throws SQLException {
  165. return null;
  166. }
  167. @Override
  168. public boolean isValid(int timeout) throws SQLException {
  169. return false;
  170. }
  171. @Override
  172. public void setClientInfo(String name, String value) throws SQLClientInfoException {
  173. }
  174. @Override
  175. public void setClientInfo(Properties properties) throws SQLClientInfoException {
  176. }
  177. @Override
  178. public String getClientInfo(String name) throws SQLException {
  179. return null;
  180. }
  181. @Override
  182. public Properties getClientInfo() throws SQLException {
  183. return null;
  184. }
  185. @Override
  186. public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
  187. return null;
  188. }
  189. @Override
  190. public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
  191. return null;
  192. }
  193. @Override
  194. public void setSchema(String schema) throws SQLException {
  195. }
  196. @Override
  197. public String getSchema() throws SQLException {
  198. return null;
  199. }
  200. @Override
  201. public void abort(Executor executor) throws SQLException {
  202. }
  203. @Override
  204. public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
  205. }
  206. @Override
  207. public int getNetworkTimeout() throws SQLException {
  208. return 0;
  209. }
  210. @Override
  211. public <T> T unwrap(Class<T> iface) throws SQLException {
  212. return null;
  213. }
  214. @Override
  215. public boolean isWrapperFor(Class<?> iface) throws SQLException {
  216. return false;
  217. }
  218. }


  1. import java.sql.Connection;
  2. import java.util.ArrayList;
  3. import java.util.HashSet;
  4. import java.util.Iterator;
  5. import java.util.LinkedList;
  6. import java.util.concurrent.Semaphore;
  7. /**
  8. * 使用信号量控制数据库的链接和释放
  9. */
  10. public class DBPoolSemaphore {
  11. /**
  12. * 池容量
  13. */
  14. private final static int POOL_SIZE = 10;
  15. /**
  16. * useful 代表可用连接
  17. * useless 代表已用连接
  18. * 为什么要使用两个Semaphore呢?是因为,在连接池中不只有连接本身是资源,空位也是资源,也需要记录
  19. */
  20. private final Semaphore useful, useless;
  21. /**
  22. * 连接池
  23. */
  24. private final static LinkedList<Connection> POOL = new LinkedList<>();
  25. /**
  26. * 使用静态块初始化池
  27. */
  28. static {
  29. for (int i = 0; i < POOL_SIZE; i++) {
  30. POOL.addLast(SqlConnection.fetchConnection());
  31. }
  32. }
  33. public DBPoolSemaphore() {
  34. // 初始可用的许可证等于池容量
  35. useful = new Semaphore(POOL_SIZE);
  36. // 初始不可用的许可证容量为0
  37. useless = new Semaphore(0);
  38. }
  39. /**
  40. * 获取数据库连接
  41. *
  42. * @return 连接对象
  43. */
  44. public Connection takeConnection() throws InterruptedException {
  45. // 可用许可证减一
  46. useful.acquire();
  47. Connection connection;
  48. synchronized (POOL) {
  49. connection = POOL.removeFirst();
  50. }
  51. // 不可用许可证数量加一
  52. useless.release();
  53. return connection;
  54. }
  55. /**
  56. * 释放链接
  57. *
  58. * @param connection 连接对象
  59. */
  60. public void returnConnection(Connection connection) throws InterruptedException {
  61. if(null!=connection){
  62. // 打印日志
  63. System.out.println("当前有"+useful.getQueueLength()+"个线程等待获取连接,,"
  64. +"可用连接有"+useful.availablePermits()+"个");
  65. // 不可用许可证减一
  66. useless.acquire();
  67. synchronized (POOL){
  68. POOL.addLast(connection);
  69. }
  70. // 可用许可证加一
  71. useful.release();
  72. }
  73. }
  74. }


  1. import;
  2. import java.sql.Connection;
  3. import java.util.Random;
  4. /**
  5. * 测试Semaphore
  6. */
  7. public class UseSemaphore {
  8. /**
  9. * 连接池
  10. */
  11. public static final DBPoolSemaphore pool = new DBPoolSemaphore();
  12. private static class BusiThread extends Thread{
  13. @Override
  14. public void run() {
  15. // 随机数工具类 为了让每个线程持有连接的时间不一样
  16. Random random = new Random();
  17. long start = System.currentTimeMillis();
  18. try {
  19. Connection connection = pool.takeConnection();
  20. System.out.println("Thread_"+Thread.currentThread().getId()+
  21. "_获取数据库连接耗时["+(System.currentTimeMillis()-start)+"]ms.");
  22. // 模拟使用连接查询数据
  24. System.out.println("查询数据完成归还连接");
  25. pool.returnConnection(connection);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. public static void main(String[] args) {
  32. for (int i = 0; i < 50; i++) {
  33. BusiThread busiThread = new BusiThread();
  34. busiThread.start();
  35. }
  36. }
  37. }


  1. Thread_11_获取数据库连接耗时[0]ms.
  2. Thread_12_获取数据库连接耗时[0]ms.
  3. Thread_13_获取数据库连接耗时[0]ms.
  4. Thread_14_获取数据库连接耗时[0]ms.
  5. Thread_15_获取数据库连接耗时[0]ms.
  6. Thread_16_获取数据库连接耗时[0]ms.
  7. Thread_17_获取数据库连接耗时[0]ms.
  8. Thread_18_获取数据库连接耗时[0]ms.
  9. Thread_19_获取数据库连接耗时[0]ms.
  10. Thread_20_获取数据库连接耗时[0]ms.
  11. 查询数据完成归还连接
  12. 当前有40个线程等待获取连接,,可用连接有0
  13. Thread_21_获取数据库连接耗时[112]ms.
  14. 查询数据完成归还连接
  15. ...................
  16. 查询数据完成归还连接
  17. 当前有2个线程等待获取连接,,可用连接有0
  18. Thread_59_获取数据库连接耗时[637]ms.
  19. 查询数据完成归还连接
  20. 当前有1个线程等待获取连接,,可用连接有0
  21. Thread_60_获取数据库连接耗时[660]ms.
  22. 查询数据完成归还连接
  23. 当前有0个线程等待获取连接,,可用连接有0
  24. 查询数据完成归还连接
  25. ...................
  26. 当前有0个线程等待获取连接,,可用连接有8
  27. 查询数据完成归还连接
  28. 当前有0个线程等待获取连接,,可用连接有9




  1. /**
  2. * Creates a new Exchanger.
  3. */
  4. public Exchanger() {
  5. participant = new Participant();
  6. }


  1. import java.util.HashSet;
  2. import java.util.Set;
  3. import java.util.concurrent.Exchanger;
  4. /**
  5. * 线程之间交换数据
  6. */
  7. public class UseExchange {
  8. private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
  9. public static void main(String[] args) {
  10. new Thread(){
  11. @Override
  12. public void run() {
  13. Set<String> aSet = new HashSet<>();
  14. aSet.add("A");
  15. aSet.add("B");
  16. aSet.add("C");
  17. try {
  18. Set<String> exchange =;
  19. for (String s : exchange) {
  20. System.out.println("aSet"+s);
  21. }
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }.start();
  27. new Thread(){
  28. @Override
  29. public void run() {
  30. Set<String> bSet = new HashSet<>();
  31. bSet.add("1");
  32. bSet.add("2");
  33. bSet.add("3");
  34. try {
  35. Set<String> exchange =;
  36. for (String s : exchange) {
  37. System.out.println("bSet"+s);
  38. }
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }.start();
  44. }
  45. }


  1. bSetA
  2. bSetB
  3. bSetC
  4. aSet1
  5. aSet2
  6. aSet3
