原文: https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/

如果您知道在 Web 服务器中,则可以配置到服务器的最大并发连接数。 如果有更多连接超出此限制,则它们必须等待直到释放或关闭某些其他连接。 此限制可以视为节流。 节流是为输出速率比输入速率慢的系统调节输入速率的能力。 必须停止系统崩溃或资源耗尽。

在与BlockingQueueThreadPoolExecutor相关的上一篇文章中,我们了解了如何创建具有以下能力的CustomThreadPoolExecutor

1)提交到阻塞队列
的任务,2)一个执行器,从队列中拾取任务并执行它们,3)已在ExecuteGate之后覆盖了Execute()方法以执行一些必要的额外活动,4)附加了一个RejectedExecutionHandler,用于处理由于队列已满而被拒绝的任务

我们的方法已经足够好,并且能够处理大多数实际情况。 现在,我们再添加一个概念,在某些情况下可能会证明是有益的。 这个概念是围绕队列中任务提交的限制。

在此示例中,节流将有助于使队列中的任务数保持在限制范围内,从而使任何任务都不会被拒绝。 它从本质上也消除了RejectedExecutionHandler的必要性。

使用CustomThreadPoolExecutorRejectedExecutionHandler的先前解决方案

在此解决方案中,我们有以下类:

DemoTask.java

  1. public class DemoTask implements Runnable
  2. {
  3. private String name = null;
  4. public DemoTask(String name) {
  5. this.name = name;
  6. }
  7. public String getName() {
  8. return this.name;
  9. }
  10. @Override
  11. public void run(){
  12. try {
  13. Thread.sleep(1000);
  14. } catch (InterruptedException e){
  15. e.printStackTrace();
  16. }
  17. System.out.println("Executing : " + name);
  18. }
  19. }

CustomThreadPoolExecutor.java

  1. import java.util.concurrent.BlockingQueue;
  2. import java.util.concurrent.ThreadPoolExecutor;
  3. import java.util.concurrent.TimeUnit;
  4. public class CustomThreadPoolExecutor extends ThreadPoolExecutor
  5. {
  6. public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
  7. TimeUnit unit, BlockingQueue<Runnable> workQueue)
  8. {
  9. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  10. }
  11. @Override
  12. protected void beforeExecute(Thread t, Runnable r)
  13. {
  14. super.beforeExecute(t, r);
  15. }
  16. @Override
  17. protected void afterExecute(Runnable r, Throwable t)
  18. {
  19. super.afterExecute(r, t);
  20. if (t != null)
  21. {
  22. t.printStackTrace();
  23. }
  24. }
  25. }

DemoExecutor.java

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.RejectedExecutionHandler;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. public class DemoExecutor
  7. {
  8. public static void main(String[] args)
  9. {
  10. Integer threadCounter = 0;
  11. BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
  12. CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
  13. executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
  14. {
  15. @Override
  16. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
  17. {
  18. System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
  19. try
  20. {
  21. Thread.sleep(1000);
  22. } catch (InterruptedException e)
  23. {
  24. e.printStackTrace();
  25. }
  26. System.out.println("Lets add another time : " + ((DemoTask) r).getName());
  27. executor.execute(r);
  28. }
  29. });
  30. // Let start all core threads initially
  31. executor.prestartAllCoreThreads();
  32. while (true)
  33. {
  34. threadCounter++;
  35. // Adding threads one by one
  36. //System.out.println("Adding DemoTask : " + threadCounter);
  37. executor.execute(new DemoTask(threadCounter.toString()));
  38. if (threadCounter == 1000)
  39. break;
  40. }
  41. }
  42. }

如果运行上述程序,则将获得输出,如下所示:

  1. DemoTask Rejected : 71
  2. Executing : 3
  3. Executing : 5
  4. ...
  5. ...

将出现多次“DemoTask Rejected”。 在下一个解决方案中,我们将使用节流技术,以使任何任务都不会被拒绝。

使用ThreadPoolExecutorSemaphore限制任务的提交率

在此解决方案中,我们将创建一个Semaphore,其编号必须等于在任何给定时间点阻塞队列中的最大任务数。 因此该方法如下所示:

1)在执行任务之前,要求锁定信号量
2)如果获取了锁定,则执行正常。 否则,将重试直到获得锁
3)任务完成后; 锁被释放到信号量

我们启用节流的新BlockingThreadPoolExecutor如下所示:

  1. package threadpoolDemo;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.RejectedExecutionException;
  4. import java.util.concurrent.Semaphore;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. import java.util.concurrent.TimeUnit;
  7. public class BlockingThreadPoolExecutor extends ThreadPoolExecutor
  8. {
  9. private final Semaphore semaphore;
  10. public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
  11. {
  12. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  13. semaphore = new Semaphore(corePoolSize + 50);
  14. }
  15. @Override
  16. protected void beforeExecute(Thread t, Runnable r)
  17. {
  18. super.beforeExecute(t, r);
  19. }
  20. @Override
  21. public void execute(final Runnable task)
  22. {
  23. boolean acquired = false;
  24. do
  25. {
  26. try
  27. {
  28. semaphore.acquire();
  29. acquired = true;
  30. } catch (final InterruptedException e)
  31. {
  32. //LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
  33. }
  34. } while (!acquired);
  35. try
  36. {
  37. super.execute(task);
  38. } catch (final RejectedExecutionException e)
  39. {
  40. System.out.println("Task Rejected");
  41. semaphore.release();
  42. throw e;
  43. }
  44. }
  45. @Override
  46. protected void afterExecute(Runnable r, Throwable t)
  47. {
  48. super.afterExecute(r, t);
  49. if (t != null)
  50. {
  51. t.printStackTrace();
  52. }
  53. semaphore.release();
  54. }
  55. }

现在,如下测试代码。

  1. package threadpoolDemo;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.RejectedExecutionHandler;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. import java.util.concurrent.TimeUnit;
  7. public class DemoExecutor
  8. {
  9. public static void main(String[] args)
  10. {
  11. Integer threadCounter = 0;
  12. BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
  13. BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
  14. executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
  15. {
  16. @Override
  17. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
  18. {
  19. System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
  20. try
  21. {
  22. Thread.sleep(1000);
  23. } catch (InterruptedException e)
  24. {
  25. e.printStackTrace();
  26. }
  27. System.out.println("Lets add another time : " + ((DemoTask) r).getName());
  28. executor.execute(r);
  29. }
  30. });
  31. // Let start all core threads initially
  32. executor.prestartAllCoreThreads();
  33. while (true)
  34. {
  35. threadCounter++;
  36. // Adding threads one by one
  37. System.out.println("Adding DemoTask : " + threadCounter);
  38. executor.execute(new DemoTask(threadCounter.toString()));
  39. if (threadCounter == 1000)
  40. break;
  41. }
  42. }
  43. }

当使用BlockingThreadPoolExecutor代替CustomThreadPoolExecutor运行DemoExecutor程序时,您不会看到任何任务被拒绝,并且所有任务都将成功执行。

您可以控制在任何时候通过Semaphore构造函数传递参数的任务数量。

这就是这篇文章的全部内容。 您应该阅读有关并发的更多信息,以提高信心。

学习愉快!