简单线程池
- 固定线程数,没有动态调整线程数(maximunPoolSize)
- 使用java的LinkedBlockingQueue,默认资源耗尽为止,无拒绝策略
线程池类:MyThreadPoolCopy
public class MyThreadPoolCopy {
BlockingQueue<Task> blockingQueue;
List<MyThreadCopy> myThreadCopyList;
public MyThreadPoolCopy(int poolSize, int queueSize) {
blockingQueue = new LinkedBlockingDeque<>(queueSize);
myThreadCopyList = new ArrayList<>(poolSize);
for (int i = 0; i < poolSize; i++) {
MyThreadCopy myThreadCopy = new MyThreadCopy();
myThreadCopy.start();
myThreadCopyList.add(myThreadCopy);
}
}
public void submit(Task task) throws InterruptedException {
blockingQueue.put(task);
}
class MyThreadCopy extends Thread {
@Override
public void run() {
while (true) {
try {
Task take = blockingQueue.take();
take.executeJob();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
任务类:Task
public class Task {
String value;
public Task(String value) {
this.value = value;
}
public void executeJob() {
System.out.println("value:" + value + ", Thread:" + Thread.currentThread().getName());
}
}
调用方式
public static void main(String[] args) throws InterruptedException {
MyThreadPoolCopy copy = new MyThreadPoolCopy(2, 10);
Task task1 = new Task("AAA");
Task task2 = new Task("BBB");
Task task3 = new Task("CCC");
Task task4 = new Task("DDD");
Task task5 = new Task("EEE");
Task task6 = new Task("FFF");
copy.submit(task1);
copy.submit(task2);
copy.submit(task3);
copy.submit(task4);
copy.submit(task5);
copy.submit(task6);
}
优化
- 自己实现任务队列
- 自己实现拒绝策略
- 支持线程数的动态调整
- 对外提供安全的线程关闭接口
https://zhuanlan.zhihu.com/p/69294791
https://www.cnblogs.com/niuyourou/p/12494982.html
需要有个监控线程池,不断循环维护activeCount和queueSize平衡,新增如下方法
while (!this.isShutDown && !Thread.currentThread().isInterrupted()) {
try {
timeUnit.sleep(10);
} catch (InterruptedException e) {
this.isShutDown = true;
break;
}
synchronized (this) {
if (this.isShutDown) {
//DCL检查
break;
}
//任务超过当前线程数两倍,线程池马力全开
System.out.println(taskQueue.getSize() + " ===============================");
if (taskQueue.getSize() >= activeCount * 2 && activeCount < maxSize) {
int beforeActive = activeCount;
for (int i = 0; i < maxSize; i++) {
addThread();
}
System.out.println("add : active ->" + beforeActive + "---->" + activeCount);
continue;
}
if (taskQueue.getSize() > 0 && activeCount < coreSize) {
for (int i = 0; i < coreSize; i++) {
addThread();
}
System.out.println();
continue;
}
if (taskQueue.getSize() == 0 && activeCount > coreSize) {
for (int i = coreSize; i < activeCount(); i++) {
removeThread();
System.out.println("remove : active-> " + activeCount());
}
}
}
}
对外优雅关闭需要设置shutdown标志位以及interupter()中断,如上面的MyThreadCopy可以改成如下写法 ```java … private volatile boolean isRunning = true;
@Override public void run() { while (isRunning && !this.isInterrupted()) { try { Runnable runnable = queue.getTask(); runnable.run(); //即executeJob() } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + “ 接收到interrupted中断信号”); e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + “ 终止执行!”); }
//关闭线程 public void shutDown() { this.isRunning = false; this.interrupt(); } ```
扩展
- 看ThreadPoolExecutor源码
- 引申出AQS源码