简单线程池
- 固定线程数,没有动态调整线程数(maximunPoolSize)
- 使用java的LinkedBlockingQueue,默认资源耗尽为止,无拒绝策略
线程池类:MyThreadPoolCopy
public class MyThreadPoolCopy {BlockingQueue<Task> blockingQueue;List<MyThreadCopy> myThreadCopyList;public MyThreadPoolCopy(int poolSize, int queueSize) {blockingQueue = new LinkedBlockingDeque<>(queueSize);myThreadCopyList = new ArrayList<>(poolSize);for (int i = 0; i < poolSize; i++) {MyThreadCopy myThreadCopy = new MyThreadCopy();myThreadCopy.start();myThreadCopyList.add(myThreadCopy);}}public void submit(Task task) throws InterruptedException {blockingQueue.put(task);}class MyThreadCopy extends Thread {@Overridepublic void run() {while (true) {try {Task take = blockingQueue.take();take.executeJob();} catch (InterruptedException e) {e.printStackTrace();}}}}}
任务类:Task
public class Task {String value;public Task(String value) {this.value = value;}public void executeJob() {System.out.println("value:" + value + ", Thread:" + Thread.currentThread().getName());}}
调用方式
public static void main(String[] args) throws InterruptedException {MyThreadPoolCopy copy = new MyThreadPoolCopy(2, 10);Task task1 = new Task("AAA");Task task2 = new Task("BBB");Task task3 = new Task("CCC");Task task4 = new Task("DDD");Task task5 = new Task("EEE");Task task6 = new Task("FFF");copy.submit(task1);copy.submit(task2);copy.submit(task3);copy.submit(task4);copy.submit(task5);copy.submit(task6);}
优化
- 自己实现任务队列
- 自己实现拒绝策略
- 支持线程数的动态调整
- 对外提供安全的线程关闭接口
https://zhuanlan.zhihu.com/p/69294791
https://www.cnblogs.com/niuyourou/p/12494982.html
需要有个监控线程池,不断循环维护activeCount和queueSize平衡,新增如下方法
while (!this.isShutDown && !Thread.currentThread().isInterrupted()) {try {timeUnit.sleep(10);} catch (InterruptedException e) {this.isShutDown = true;break;}synchronized (this) {if (this.isShutDown) {//DCL检查break;}//任务超过当前线程数两倍,线程池马力全开System.out.println(taskQueue.getSize() + " ===============================");if (taskQueue.getSize() >= activeCount * 2 && activeCount < maxSize) {int beforeActive = activeCount;for (int i = 0; i < maxSize; i++) {addThread();}System.out.println("add : active ->" + beforeActive + "---->" + activeCount);continue;}if (taskQueue.getSize() > 0 && activeCount < coreSize) {for (int i = 0; i < coreSize; i++) {addThread();}System.out.println();continue;}if (taskQueue.getSize() == 0 && activeCount > coreSize) {for (int i = coreSize; i < activeCount(); i++) {removeThread();System.out.println("remove : active-> " + activeCount());}}}}
对外优雅关闭需要设置shutdown标志位以及interupter()中断,如上面的MyThreadCopy可以改成如下写法 ```java … private volatile boolean isRunning = true;
@Override public void run() { while (isRunning && !this.isInterrupted()) { try { Runnable runnable = queue.getTask(); runnable.run(); //即executeJob() } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + “ 接收到interrupted中断信号”); e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + “ 终止执行!”); }
//关闭线程 public void shutDown() { this.isRunning = false; this.interrupt(); } ```
扩展
- 看ThreadPoolExecutor源码
- 引申出AQS源码
