JUC中的ForkJoin为什么这么快?基于jdk13的代码带你分析
本文来源:
http://summerisgreen.com/blog/2020-02-07-2020-02-03-forkjoin%E4%B8%AD%E7%9A%84%E7%AA%83%E5%8F%96%E6%9C%BA%E5%88%B6.html
从jdk1.7开始,jdk提供了ForkJoin相关工具类。 与其他ExecutorService实现相比,其能够充分地利用多核优势来提高执行性能:
通过采用工作窃取算法,使得未被分配任务的工作线程也能从忙碌线程那窃取任务协助执行。
抽象ForkJoinTask提供了模板方法,以更好地支持将大任务拆分成小任务的分而治冶思想。
本文基于jdk13的代码,来探讨下ForkJoin框架对于任务何时窃取,以及怎么窃取的问题。
WorkQueue
ForkJoinPool中的WorkQueue被声明为数组形式,其通过区分外部提交和任务执行过程中产生的任务, 优先执行提交任务所产生的子任务,从而优先完成已经开始执行的提交任务。 该数组根据任务的来源区分为:
数组下标 | 任务来源 | 是否有绑定执行线程 | 被窃取方式 |
---|---|---|---|
奇数 | ForkJoinTask产生的子任务 | 绑定ForkJoinWorkerThread | 1.runWorker中的scan方法,2.awaitJoin方法窃取的队列任务 |
偶数 | 外部提交(非ForkJoinWorkerThread) | 否 | 1.runWorker中的scan方法 |
WorkQueue[] workQueues; // main registry
从上面的表格可以看出,窃取分为工作线程主循环中的例程性窃取和join阻塞时辅助性窃取。
主循环中的例程性窃取
例程性的工作窃取是发生在工作线程主循环中调用链:forkJoinWorkerThread.run() -> pool.runWorker(workQueue) -> scan(workQueue, r)中的scan方法。 其保证了优先完成已开始执行的ForkJoin任务,并且外部提交的任务在任务较多时不发生饥饿现象。
通过scan方法可以看出,其根据调用方runWorker方法所传入的随机数r开始,查找非空的窃取队列。 找到后即调用当前forkJoinWorkerThread对应WorkQueue的topLevelExec,开始常规性运行过程。
private boolean scan(WorkQueue w, int r) {
WorkQueue[] ws; int n;
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
for (int m = n - 1, j = r & m;;) {
WorkQueue q; int b;
if ((q = ws[j]) != null && q.top != (b = q.base)) {
int qid = q.id;
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
if ((a = q.array) != null && (cap = a.length) > 0) {
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
if (q.base == b++ && t != null &&
QA.compareAndSet(a, k, t, null)) {
q.base = b;
w.source = qid;
if (a[(cap - 1) & b] != null)
signalWork(q); // help signal if more tasks
w.topLevelExec(t, q, // random fairness bound
(r | (1 << TOP_BOUND_SHIFT)) & SMASK);
}
}
return true;
}
else if (--n > 0)
j = (j + 1) & m;
else
break;
}
}
return false;
}
topLevelExec方法是执行本地队列任务及窃取任务的主要方法,窃取任务从指定窃取队列以FIFO的顺序取出; 而本地队列FIFO还是LIFO,则由构造器方法中的asyncMode决定(默认为false,也就是LIFO)。
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)
mode = parallelism | (asyncMode ? FIFO : 0);
考虑到调用该方法时,本地队列中必定是不存在任务的,因此该次调用中直接执行窃取队列首部的ForkJoinTask。 之后每执行n次(>= 2^10)本地队列任务,尝试执行一次窃取队列任务。 当本地及窃取队列均没有待执行任务时,返回到runWork方法,调整窃取的随机数r后,继续调用scan方法寻找合适的窃取队列。
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
int nstolen = 1;
for (int j = 0;;) {
if (t != null)
t.doExec();
if (j++ <= n)
t = nextLocalTask();
else {
j = 0;
t = null;
}
if (t == null) {
if (q != null && (t = q.poll()) != null) {
++nstolen;
j = 0;
}
else if (j != 0)
break;
}
}
ForkJoinWorkerThread thread = owner;
nsteals += nstolen;
source = 0;
if (thread != null)
thread.afterTopLevelExec();
}
join阻塞时的窃取
forkjoin任务的join操作主要是通过调用doJoin方法来完成的。 在该方法中,可以看到其根据当前线程是否是forkjoin工作线程(ForkJoinWorkerThread)分别采取两种不同的策略。
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
非ForkJoinWorkerThread的Join
对于非ForkJoinWorkerThread的外部线程来说,其不会窃取执行其他任务。 只会在阻塞等待任务前,调用tryExternalHelp方法,尝试当前线程直接执行提交任务。 是否能直接执行提交任务,取决于是否能够满足下面三个条件。 值得注意的是,外部线程执行过程中产生的子任务将提交到common池(ForkJoinPool.common)中。
任务提交在common池中
任务提交时的线程Probe值和当前的指向同一个提交队列
任务位于队列尾部
private int tryExternalHelp() {
int s;
return ((s = status) < 0 ? s:
(this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ?
doExec() : 0);
}
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
int r = ThreadLocalRandom.getProbe();
WorkQueue[] ws; WorkQueue w; int n;
return ((ws = workQueues) != null &&
(n = ws.length) > 0 &&
(w = ws[(n - 1) & r & SQMASK]) != null &&
w.tryLockedUnpush(task));
}
ForkJoinWorkerThread的Join
对于forkjoin工作线程来说,当等待join的任务位于当前线程的工作队列尾部时,尝试直接执行当前任务。 否则调用forkJoinWorkerThread.pool.awaitJoin方法。
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
在awaitJoin方法中,其首先通过tryRemoveAndExec方法,在w(本地工作队列)中查找并执行等待join的任务。 如果依旧是未完成状态,其将进行如下的窃取循环:
在子任务工作队列(奇数索引)中窃取队列头部的task执行。
任务完成则退出。
当子任务工作队列均为空队列,且tryCompensate方法返回值非0,则阻塞至deadline,否则重新进行窃取循环。
tryCompensate方法在没有足够的工作线程时,会创建或激活一个空闲线程来补偿join线程阻塞导致的并行度不足。 该方法返回0表示需要重试,其仅在工作队列状态不一致或cas更新状态失败时出现。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
int seed = ThreadLocalRandom.nextSecondarySeed();
if (w != null && task != null &&
(!(task instanceof CountedCompleter) ||
(s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
w.tryRemoveAndExec(task);
int src = w.source, id = w.id;
int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
s = task.status;
while (s >= 0) {
WorkQueue[] ws;
int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
while (n > 0) {
WorkQueue q; int b;
if ((q = ws[r & m]) != null && q.source == id &&
q.top != (b = q.base)) {
/* 执行队列头部任务 */
break;
}
else {
r += step;
--n;
}
}
if ((s = task.status) < 0)
break;
else if (n == 0) { // empty scan
long ms, ns; int block;
if ((ns = deadline - System.nanoTime()) <= 0L)
break; // timeout
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L; // avoid 0 for timed wait
if ((block = tryCompensate(w)) != 0) {
task.internalWait(ms);
CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
}
s = task.status;
}
}
}
return s;
}