把一个大任务拆成多个子任务进行并行计算再把拆分的子任务的计算结果进行合并
基本方法
1.fork()创建异步执行的子任务
2.等待任务完成后返回计算结果
3.开始执行任务 必要时等待其执行结果
RecursiveAction
无返回结果的任务
RecursiveTask
有返回结果的类
invoke(ForkJoinTask)
提交任务并一直阻塞 直到任务执行完成返回合并结果
executr()
异步执行没有返回值
submit(ForkJoinTask)
异步执行任务 返回Task本身 通过task.get()方法获取合并后的结果
应用
public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> task = forkJoinPool.submit(new CalculationTask(1, 2002));try {Integer integer = task.get();System.out.println("执行结果"+integer);}catch (Exception e) {e.printStackTrace();}}private static final Integer MAX = 400;static class CalculationTask extends RecursiveTask<Integer> {private Integer startValue; //子任务开始计算的值private Integer endValue; //子任务结束计算的值public CalculationTask(Integer startValue, Integer endValue) {this.startValue = startValue;this.endValue = endValue;}@Overrideprotected Integer compute() {if (endValue - startValue < MAX) {System.out.println("开始计算"+"startValue="+startValue+"endValue="+endValue);Integer totalValue = 0 ;for (int index = this.startValue; index <=this.endValue ; index++) {totalValue +=index;}return totalValue;}return createSubtasks();}private Integer createSubtasks() {CalculationTask task = new CalculationTask(startValue,(startValue+endValue)/2);task.fork();CalculationTask task1 = new CalculationTask((startValue+endValue)/2+1,endValue);task1.fork();return task.join()+task1.join();}}
实现原理
1.使用forkJoinPool.submit提交任务 如果是第一次提交 需要初始化 forkJoinPool中的workQueues数组
workQueue(工作队列包含属性)
ForkJoinTasks[]
用来存放通过 submit /execute方法提交的 ForkJoinTask
ForkJoinTaskWorkerThreadowner是 ForkJoinPoll中的工作线程 该线程用于执行具体的ForkJoinTask
ForkJoinPoll 指向当前ForkJoinPoll实例的引用 该引用是为了当ForkJoinTask数组中的任务处理完成之后 再次获取任务并交给ForkJoinTaskWorkerThread处理
2.通过r&m&SQMASK进行取模计算 计算WorkQueues数组的下标 把当前ForkJoinTask添加到指定位置
m表示 workQueues数组长度
r是通过 ThreadLocalRandom.getProbe得到的随机数
SQMASK = 0*007e表示任何整数和 SQMASK进行与运算后得到的一定是偶数 也就是第一次提交的任务会放到 workQueues的偶数位
3.任务提交后 需要安排线程来执行 如果工作线程数不够且没有正在等待的线程则创建一个新的ForkJoinWorkerThread
4.初始化时候 ForkJoinWorkerThread线程会调用registerWorker方法绑定一个工作线程 也就是把ForkJoinPoll中的WorkQueues数组的奇位数分配给当前线程
5.启动创建好的线程 从当前线程绑定的工作队列中获取任务来执行 由于第一次进来时存储数据在数组的偶数位 而当前线程绑定的是奇数位 所以当前线程工作队列中没有任务 所以会从其他线程窃取
6.当前线程执行完成后发现没有任务需要执行则等待
工作窃取
由于每个工作线程都从自己的工作队列中来获得任务执行 如果某个工作线程执行完自己工作队列中的任务 就会进入阻塞状态 有可能其他的工作线程还有任务没有执行完
工作窃取就是当自己的工作线程任务执行完毕 从其他工作队列中窃取任务来执行为了避免任务获取存在的竞争线程进行工作窃取时是从队列的尾部来获取任务的
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task;}
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);//将任务添加到当前WorkQueue的ForkJoinTask数组中U.putOrderedInt(q, QTOP, s + 1);//释放QTOP索引U.putIntVolatile(q, QLOCK, 0);//释放锁if (n <= 1) //当前队列的任务处理完毕 工作线程属于阻塞状态signalWork(ws, q); //唤醒或者创建线程return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}//如果存在线程竞争或者WorkQueues数组没有初始化externalSubmit(task);}
private void externalSubmit(ForkJoinTask<?> task) {int r; // initialize caller's probe//得到一个探针hash值if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;//当前线程池的状态为TERMINATE拒绝添加任务if ((rs = runState) < 0) {tryTerminate(false, false); // help terminatethrow new RejectedExecutionException();}//队列为空 进行初始化else if ((rs & STARTED) == 0 || // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;//获得锁rs = lockRunState();try {if ((rs & STARTED) == 0) {U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of twoint p = config & SMASK; // ensure at least 2 slots//保证数组长度为2的N次幂int n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;workQueues = new WorkQueue[n];ns = STARTED;}} finally {unlockRunState(rs, (rs & ~RSLOCK) | ns);}}//随机从workQueues数组中找到一个偶数位下标对应的 workQueue 把任务添加到队列中else if ((q = ws[k = r & m & SQMASK]) != null) {if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry { // locked version of pushif ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {//计算存储偏移量int j = (((a.length - 1) & s) << ASHIFT) + ABASE;//把任务存储到数组的指定位置U.putOrderedObject(a, j, task);//修改索引U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {U.compareAndSwapInt(q, QLOCK, 1, 0);}//任务提交成功 唤醒或者创建工作线程来执行if (submitted) {signalWork(ws, q);return;}}move = true; // move on failure}//如果指定偶数位下标的还未初始化 则构建一个新的workQueue保存到数组中该下标位置else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);q.hint = r;q.config = k | SHARED_QUEUE;q.scanState = INACTIVE;rs = lockRunState(); // publish indexif (rs > 0 && (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q; // else terminatedunlockRunState(rs, rs & ~RSLOCK);}//不满足上面的条件 重新更新hash探针 继续寻找数组的下一个元素elsemove = true; // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}}
唤醒或创建工作线程
final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;while ((c = ctl) < 0L) {// too few active//没有空闲的工作线程if ((sp = (int)c) == 0) {// no idle workers//工作线程还没有到达阈值if ((c & ADD_WORKER) != 0L) // too few workerstryAddWorker(c); //创建工作线程break;}//队列为空 可能线程已经终止或未初始化if (ws == null) // unstarted/terminatedbreak;if (ws.length <= (i = sp & SMASK)) // terminatedbreak;if ((v = ws[i]) == null) // terminatingbreak;int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanStateint d = sp - v.scanState; // screen CAS//设置活跃工作线程数 总工作线程数long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs; // activate vif ((p = v.parker) != null)U.unpark(p); //唤醒工作线程break;}if (q != null && q.base == q.top) // no more workbreak;}}
