如果您知道在 Web 服务器中,则可以配置到服务器的最大并发连接数。 如果有更多连接超出此限制,则它们必须等待直到释放或关闭某些其他连接。 此限制可以视为节流。 节流是为输出速率比输入速率慢的系统调节输入速率的能力。 必须停止系统崩溃或资源耗尽。
在与BlockingQueue
和ThreadPoolExecutor
相关的上一篇文章中,我们了解了如何创建具有以下能力的CustomThreadPoolExecutor
:
1)提交到阻塞队列
的任务,2)一个执行器,从队列中拾取任务并执行它们,3)已在ExecuteGate
之后覆盖了Execute()
方法以执行一些必要的额外活动,4)附加了一个RejectedExecutionHandler
,用于处理由于队列已满而被拒绝的任务
我们的方法已经足够好,并且能够处理大多数实际情况。 现在,我们再添加一个概念,在某些情况下可能会证明是有益的。 这个概念是围绕队列中任务提交的限制。
在此示例中,节流将有助于使队列中的任务数保持在限制范围内,从而使任何任务都不会被拒绝。 它从本质上也消除了RejectedExecutionHandler
的必要性。
使用CustomThreadPoolExecutor
和RejectedExecutionHandler
的先前解决方案
在此解决方案中,我们有以下类:
DemoTask.java
public class DemoTask implements Runnable
{
private String name = null;
public DemoTask(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
@Override
public void run(){
try {
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("Executing : " + name);
}
}
CustomThreadPoolExecutor.java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExecutor extends ThreadPoolExecutor
{
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r)
{
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
if (t != null)
{
t.printStackTrace();
}
}
}
DemoExecutor.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DemoExecutor
{
public static void main(String[] args)
{
Integer threadCounter = 0;
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Lets add another time : " + ((DemoTask) r).getName());
executor.execute(r);
}
});
// Let start all core threads initially
executor.prestartAllCoreThreads();
while (true)
{
threadCounter++;
// Adding threads one by one
//System.out.println("Adding DemoTask : " + threadCounter);
executor.execute(new DemoTask(threadCounter.toString()));
if (threadCounter == 1000)
break;
}
}
}
如果运行上述程序,则将获得输出,如下所示:
DemoTask Rejected : 71
Executing : 3
Executing : 5
...
...
将出现多次“DemoTask Rejected
”。 在下一个解决方案中,我们将使用节流技术,以使任何任务都不会被拒绝。
使用ThreadPoolExecutor
和Semaphore
限制任务的提交率
在此解决方案中,我们将创建一个Semaphore
,其编号必须等于在任何给定时间点阻塞队列中的最大任务数。 因此该方法如下所示:
1)在执行任务之前,要求锁定信号量
2)如果获取了锁定,则执行正常。 否则,将重试直到获得锁
3)任务完成后; 锁被释放到信号量
我们启用节流的新BlockingThreadPoolExecutor
如下所示:
package threadpoolDemo;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor
{
private final Semaphore semaphore;
public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
semaphore = new Semaphore(corePoolSize + 50);
}
@Override
protected void beforeExecute(Thread t, Runnable r)
{
super.beforeExecute(t, r);
}
@Override
public void execute(final Runnable task)
{
boolean acquired = false;
do
{
try
{
semaphore.acquire();
acquired = true;
} catch (final InterruptedException e)
{
//LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
}
} while (!acquired);
try
{
super.execute(task);
} catch (final RejectedExecutionException e)
{
System.out.println("Task Rejected");
semaphore.release();
throw e;
}
}
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
if (t != null)
{
t.printStackTrace();
}
semaphore.release();
}
}
现在,如下测试代码。
package threadpoolDemo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DemoExecutor
{
public static void main(String[] args)
{
Integer threadCounter = 0;
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Lets add another time : " + ((DemoTask) r).getName());
executor.execute(r);
}
});
// Let start all core threads initially
executor.prestartAllCoreThreads();
while (true)
{
threadCounter++;
// Adding threads one by one
System.out.println("Adding DemoTask : " + threadCounter);
executor.execute(new DemoTask(threadCounter.toString()));
if (threadCounter == 1000)
break;
}
}
}
当使用BlockingThreadPoolExecutor
代替CustomThreadPoolExecutor
运行DemoExecutor
程序时,您不会看到任何任务被拒绝,并且所有任务都将成功执行。
您可以控制在任何时候通过Semaphore
构造函数传递参数的任务数量。
这就是这篇文章的全部内容。 您应该阅读有关并发的更多信息,以提高信心。
学习愉快!