如果您知道在 Web 服务器中,则可以配置到服务器的最大并发连接数。 如果有更多连接超出此限制,则它们必须等待直到释放或关闭某些其他连接。 此限制可以视为节流。 节流是为输出速率比输入速率慢的系统调节输入速率的能力。 必须停止系统崩溃或资源耗尽。
在与BlockingQueue和ThreadPoolExecutor相关的上一篇文章中,我们了解了如何创建具有以下能力的CustomThreadPoolExecutor:
1)提交到阻塞队列
的任务,2)一个执行器,从队列中拾取任务并执行它们,3)已在ExecuteGate之后覆盖了Execute()方法以执行一些必要的额外活动,4)附加了一个RejectedExecutionHandler,用于处理由于队列已满而被拒绝的任务
我们的方法已经足够好,并且能够处理大多数实际情况。 现在,我们再添加一个概念,在某些情况下可能会证明是有益的。 这个概念是围绕队列中任务提交的限制。
在此示例中,节流将有助于使队列中的任务数保持在限制范围内,从而使任何任务都不会被拒绝。 它从本质上也消除了RejectedExecutionHandler的必要性。
使用CustomThreadPoolExecutor和RejectedExecutionHandler的先前解决方案
在此解决方案中,我们有以下类:
DemoTask.java
public class DemoTask implements Runnable{private String name = null;public DemoTask(String name) {this.name = name;}public String getName() {return this.name;}@Overridepublic void run(){try {Thread.sleep(1000);} catch (InterruptedException e){e.printStackTrace();}System.out.println("Executing : " + name);}}
CustomThreadPoolExecutor.java
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class CustomThreadPoolExecutor extends ThreadPoolExecutor{public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue){super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected void beforeExecute(Thread t, Runnable r){super.beforeExecute(t, r);}@Overrideprotected void afterExecute(Runnable r, Throwable t){super.afterExecute(r, t);if (t != null){t.printStackTrace();}}}
DemoExecutor.java
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class DemoExecutor{public static void main(String[] args){Integer threadCounter = 0;BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);executor.setRejectedExecutionHandler(new RejectedExecutionHandler(){@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor){System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());try{Thread.sleep(1000);} catch (InterruptedException e){e.printStackTrace();}System.out.println("Lets add another time : " + ((DemoTask) r).getName());executor.execute(r);}});// Let start all core threads initiallyexecutor.prestartAllCoreThreads();while (true){threadCounter++;// Adding threads one by one//System.out.println("Adding DemoTask : " + threadCounter);executor.execute(new DemoTask(threadCounter.toString()));if (threadCounter == 1000)break;}}}
如果运行上述程序,则将获得输出,如下所示:
DemoTask Rejected : 71Executing : 3Executing : 5......
将出现多次“DemoTask Rejected”。 在下一个解决方案中,我们将使用节流技术,以使任何任务都不会被拒绝。
使用ThreadPoolExecutor和Semaphore限制任务的提交率
在此解决方案中,我们将创建一个Semaphore,其编号必须等于在任何给定时间点阻塞队列中的最大任务数。 因此该方法如下所示:
1)在执行任务之前,要求锁定信号量
2)如果获取了锁定,则执行正常。 否则,将重试直到获得锁
3)任务完成后; 锁被释放到信号量
我们启用节流的新BlockingThreadPoolExecutor如下所示:
package threadpoolDemo;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.Semaphore;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class BlockingThreadPoolExecutor extends ThreadPoolExecutor{private final Semaphore semaphore;public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);semaphore = new Semaphore(corePoolSize + 50);}@Overrideprotected void beforeExecute(Thread t, Runnable r){super.beforeExecute(t, r);}@Overridepublic void execute(final Runnable task){boolean acquired = false;do{try{semaphore.acquire();acquired = true;} catch (final InterruptedException e){//LOGGER.warn("InterruptedException whilst aquiring semaphore", e);}} while (!acquired);try{super.execute(task);} catch (final RejectedExecutionException e){System.out.println("Task Rejected");semaphore.release();throw e;}}@Overrideprotected void afterExecute(Runnable r, Throwable t){super.afterExecute(r, t);if (t != null){t.printStackTrace();}semaphore.release();}}
现在,如下测试代码。
package threadpoolDemo;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class DemoExecutor{public static void main(String[] args){Integer threadCounter = 0;BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);executor.setRejectedExecutionHandler(new RejectedExecutionHandler(){@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor){System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());try{Thread.sleep(1000);} catch (InterruptedException e){e.printStackTrace();}System.out.println("Lets add another time : " + ((DemoTask) r).getName());executor.execute(r);}});// Let start all core threads initiallyexecutor.prestartAllCoreThreads();while (true){threadCounter++;// Adding threads one by oneSystem.out.println("Adding DemoTask : " + threadCounter);executor.execute(new DemoTask(threadCounter.toString()));if (threadCounter == 1000)break;}}}
当使用BlockingThreadPoolExecutor代替CustomThreadPoolExecutor运行DemoExecutor程序时,您不会看到任何任务被拒绝,并且所有任务都将成功执行。
您可以控制在任何时候通过Semaphore构造函数传递参数的任务数量。
这就是这篇文章的全部内容。 您应该阅读有关并发的更多信息,以提高信心。
学习愉快!
