vlambda博客
学习文章列表

JUC中的ForkJoin为什么这么快?基于jdk13的代码带你分析

点击上方☝ SpringForAll社区  轻松关注!
及时获取有趣有料的技术文章

本文来源:

http://summerisgreen.com/blog/2020-02-07-2020-02-03-forkjoin%E4%B8%AD%E7%9A%84%E7%AA%83%E5%8F%96%E6%9C%BA%E5%88%B6.html


从jdk1.7开始,jdk提供了ForkJoin相关工具类。 与其他ExecutorService实现相比,其能够充分地利用多核优势来提高执行性能:

  1. 通过采用工作窃取算法,使得未被分配任务的工作线程也能从忙碌线程那窃取任务协助执行。

  2. 抽象ForkJoinTask提供了模板方法,以更好地支持将大任务拆分成小任务的分而治冶思想。

本文基于jdk13的代码,来探讨下ForkJoin框架对于任务何时窃取,以及怎么窃取的问题。

WorkQueue

ForkJoinPool中的WorkQueue被声明为数组形式,其通过区分外部提交和任务执行过程中产生的任务, 优先执行提交任务所产生的子任务,从而优先完成已经开始执行的提交任务。 该数组根据任务的来源区分为:

数组下标 任务来源 是否有绑定执行线程 被窃取方式
奇数 ForkJoinTask产生的子任务 绑定ForkJoinWorkerThread 1.runWorker中的scan方法,2.awaitJoin方法窃取的队列任务
偶数 外部提交(非ForkJoinWorkerThread) 1.runWorker中的scan方法
 
   
   
 
  1. WorkQueue[] workQueues; // main registry

从上面的表格可以看出,窃取分为工作线程主循环中的例程性窃取和join阻塞时辅助性窃取。

主循环中的例程性窃取

例程性的工作窃取是发生在工作线程主循环中调用链:forkJoinWorkerThread.run() -> pool.runWorker(workQueue) -> scan(workQueue, r)中的scan方法。 其保证了优先完成已开始执行的ForkJoin任务,并且外部提交的任务在任务较多时不发生饥饿现象。

通过scan方法可以看出,其根据调用方runWorker方法所传入的随机数r开始,查找非空的窃取队列。 找到后即调用当前forkJoinWorkerThread对应WorkQueue的topLevelExec,开始常规性运行过程。

 
   
   
 
  1. private boolean scan(WorkQueue w, int r) {

  2. WorkQueue[] ws; int n;

  3. if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {

  4. for (int m = n - 1, j = r & m;;) {

  5. WorkQueue q; int b;

  6. if ((q = ws[j]) != null && q.top != (b = q.base)) {

  7. int qid = q.id;

  8. ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;

  9. if ((a = q.array) != null && (cap = a.length) > 0) {

  10. t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);

  11. if (q.base == b++ && t != null &&

  12. QA.compareAndSet(a, k, t, null)) {

  13. q.base = b;

  14. w.source = qid;

  15. if (a[(cap - 1) & b] != null)

  16. signalWork(q); // help signal if more tasks

  17. w.topLevelExec(t, q, // random fairness bound

  18. (r | (1 << TOP_BOUND_SHIFT)) & SMASK);

  19. }

  20. }

  21. return true;

  22. }

  23. else if (--n > 0)

  24. j = (j + 1) & m;

  25. else

  26. break;

  27. }

  28. }

  29. return false;

  30. }

topLevelExec方法是执行本地队列任务及窃取任务的主要方法,窃取任务从指定窃取队列以FIFO的顺序取出; 而本地队列FIFO还是LIFO,则由构造器方法中的asyncMode决定(默认为false,也就是LIFO)。

 
   
   
 
  1. public ForkJoinPool(int parallelism,

  2. ForkJoinWorkerThreadFactory factory,

  3. UncaughtExceptionHandler handler,

  4. boolean asyncMode)


  5. mode = parallelism | (asyncMode ? FIFO : 0);

考虑到调用该方法时,本地队列中必定是不存在任务的,因此该次调用中直接执行窃取队列首部的ForkJoinTask。 之后每执行n次(>= 2^10)本地队列任务,尝试执行一次窃取队列任务。 当本地及窃取队列均没有待执行任务时,返回到runWork方法,调整窃取的随机数r后,继续调用scan方法寻找合适的窃取队列。

 
   
   
 
  1. final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {

  2. int nstolen = 1;

  3. for (int j = 0;;) {

  4. if (t != null)

  5. t.doExec();

  6. if (j++ <= n)

  7. t = nextLocalTask();

  8. else {

  9. j = 0;

  10. t = null;

  11. }

  12. if (t == null) {

  13. if (q != null && (t = q.poll()) != null) {

  14. ++nstolen;

  15. j = 0;

  16. }

  17. else if (j != 0)

  18. break;

  19. }

  20. }

  21. ForkJoinWorkerThread thread = owner;

  22. nsteals += nstolen;

  23. source = 0;

  24. if (thread != null)

  25. thread.afterTopLevelExec();

  26. }

join阻塞时的窃取

forkjoin任务的join操作主要是通过调用doJoin方法来完成的。 在该方法中,可以看到其根据当前线程是否是forkjoin工作线程(ForkJoinWorkerThread)分别采取两种不同的策略。

 
   
   
 
  1. private int doJoin() {

  2. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;

  3. return (s = status) < 0 ? s :

  4. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?

  5. (w = (wt = (ForkJoinWorkerThread)t).workQueue).

  6. tryUnpush(this) && (s = doExec()) < 0 ? s :

  7. wt.pool.awaitJoin(w, this, 0L) :

  8. externalAwaitDone();

  9. }

非ForkJoinWorkerThread的Join

对于非ForkJoinWorkerThread的外部线程来说,其不会窃取执行其他任务。 只会在阻塞等待任务前,调用tryExternalHelp方法,尝试当前线程直接执行提交任务。 是否能直接执行提交任务,取决于是否能够满足下面三个条件。 值得注意的是,外部线程执行过程中产生的子任务将提交到common池(ForkJoinPool.common)中。

  1. 任务提交在common池中

  2. 任务提交时的线程Probe值和当前的指向同一个提交队列

  3. 任务位于队列尾部

 
   
   
 
  1. private int tryExternalHelp() {

  2. int s;

  3. return ((s = status) < 0 ? s:

  4. (this instanceof CountedCompleter) ?

  5. ForkJoinPool.common.externalHelpComplete(

  6. (CountedCompleter<?>)this, 0) :

  7. ForkJoinPool.common.tryExternalUnpush(this) ?

  8. doExec() : 0);

  9. }


  10. final boolean tryExternalUnpush(ForkJoinTask<?> task) {

  11. int r = ThreadLocalRandom.getProbe();

  12. WorkQueue[] ws; WorkQueue w; int n;

  13. return ((ws = workQueues) != null &&

  14. (n = ws.length) > 0 &&

  15. (w = ws[(n - 1) & r & SQMASK]) != null &&

  16. w.tryLockedUnpush(task));

  17. }

ForkJoinWorkerThread的Join

对于forkjoin工作线程来说,当等待join的任务位于当前线程的工作队列尾部时,尝试直接执行当前任务。 否则调用forkJoinWorkerThread.pool.awaitJoin方法。

 
   
   
 
  1. private int doJoin() {

  2. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;

  3. return (s = status) < 0 ? s :

  4. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?

  5. (w = (wt = (ForkJoinWorkerThread)t).workQueue).

  6. tryUnpush(this) && (s = doExec()) < 0 ? s :

  7. wt.pool.awaitJoin(w, this, 0L) :

  8. externalAwaitDone();

  9. }

在awaitJoin方法中,其首先通过tryRemoveAndExec方法,在w(本地工作队列)中查找并执行等待join的任务。 如果依旧是未完成状态,其将进行如下的窃取循环:

  1. 在子任务工作队列(奇数索引)中窃取队列头部的task执行。

  2. 任务完成则退出。

  3. 当子任务工作队列均为空队列,且tryCompensate方法返回值非0,则阻塞至deadline,否则重新进行窃取循环。

tryCompensate方法在没有足够的工作线程时,会创建或激活一个空闲线程来补偿join线程阻塞导致的并行度不足。 该方法返回0表示需要重试,其仅在工作队列状态不一致或cas更新状态失败时出现。

 
   
   
 
  1. final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {

  2. int s = 0;

  3. int seed = ThreadLocalRandom.nextSecondarySeed();

  4. if (w != null && task != null &&

  5. (!(task instanceof CountedCompleter) ||

  6. (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {

  7. w.tryRemoveAndExec(task);

  8. int src = w.source, id = w.id;

  9. int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;

  10. s = task.status;

  11. while (s >= 0) {

  12. WorkQueue[] ws;

  13. int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;

  14. while (n > 0) {

  15. WorkQueue q; int b;

  16. if ((q = ws[r & m]) != null && q.source == id &&

  17. q.top != (b = q.base)) {

  18. /* 执行队列头部任务 */

  19. break;

  20. }

  21. else {

  22. r += step;

  23. --n;

  24. }

  25. }

  26. if ((s = task.status) < 0)

  27. break;

  28. else if (n == 0) { // empty scan

  29. long ms, ns; int block;

  30. if ((ns = deadline - System.nanoTime()) <= 0L)

  31. break; // timeout

  32. else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)

  33. ms = 1L; // avoid 0 for timed wait

  34. if ((block = tryCompensate(w)) != 0) {

  35. task.internalWait(ms);

  36. CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);

  37. }

  38. s = task.status;

  39. }

  40. }

  41. }

  42. return s;

  43. }