JDK源码系列:Future是如何实现的?
大家好,我们在异步编程时向线程池提交(submit)一个任务后会得到一个 Future对象,通过 future.get() 方法可以堵塞等待结果的完成,例如:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//准备一个futureTask
//FutureTask 是一个实现了 RunnableFuture接口(Runnable+Future 接口)的类
RunnableFuture<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
public Boolean call() throws Exception {
System.out.println(Thread.currentThread().getId()+"do something");
return true;
}
});
//提交到一个线程上或者线程池上执行
Executors.newSingleThreadExecutor().submit(futureTask);
//获取执行结果
futureTask.get();
futureTask.get(1,TimeUnit.SECONDS);
}
通过对submit方法源码的查看可以知道,无论是你提交的这个任务 是 Runnable 对象 还是Callable对象 或者是 RunnableFuture对象,都会返回一个Future对象,实际上这个对象是一个RunnableFuture对象(FutureTask)。如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
//返回的都是FutureTask
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
//返回的都是FutureTask
return new FutureTask<T>(callable);
}
通过上面的分析可知,返回的Future对象的本质是FutureTask类,今天就分析下 FutureTask 类的实现原理。
任务submit(futureTask)之后会被线程池中某个线程获取执行,关于线程池的执行原理与过程可以看下老吕之前对线程池的分析文章()。
今天主要看下 futureTask.get()和futureTask.get(1,TimeUnit.SECONDS)的背后发生了什么?
FutureTask类代码阅读并注释
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 可能的状态变迁
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
//state 代表了当前任务的状态
private volatile int state;
private static final int NEW = 0;//还没被调度,只有这时候可以被取消
private static final int COMPLETING = 1;//已被调度,还未执行完毕
private static final int NORMAL = 2;//执行完毕,并且正常
private static final int EXCEPTIONAL = 3;//执行完毕,有异常
private static final int CANCELLED = 4;//任务被取消
private static final int INTERRUPTING = 5;//任务中断中
private static final int INTERRUPTED = 6;//任务已被中断
//这个就是要执行的任务
private Callable<V> callable;
//无论是正常结果还是异常 都会附到 outcome上
private Object outcome;
//用来执行任务的线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//等待结果的线程集合(单向链表,Treiber stack 特赖贝尔 设计,
//链表节点的追加是通过CAS方式进行的,可以解决 多线程并发追加的安全问题
//(比如多个线程同时调用了futureTask.get() 来获取结果,都会被堵塞住,做一个排队))
private volatile WaitNode waiters;
//返回结果或者抛出异常
"unchecked") (
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
//构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
//构造函数
public FutureTask(Runnable runnable, V result) {
//将Runnable 做了一个适配,转换为 Callable对象
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
//任务是否被取消了
public boolean isCancelled() {
return state >= CANCELLED;
}
//任务是否已经开始执行
public boolean isDone() {
return state != NEW;
}
//取消任务的执行
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//任务状态为NEW并且通过CAS替换成INTERRUPTING或CANCELLED 失败
//则返回失败
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断正在执行任务的线程
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//通过所有等待结果的线程
finishCompletion();
}
return true;
}
//堵塞获取结果,关键方法,主要关注下 awaitDone 方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//如果未完成,则进入等待
s = awaitDone(false, 0L);
return report(s);
}
//带超时时间的堵塞获取结果,关键方法,主要关注下 awaitDone 方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
protected void done() { }
//关键方法,回写正常执行结果,并唤醒所有等待的线程
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功
outcome = v;//正常结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // CAS改变任务最终状态:正常完成
//唤醒所有等待的线程
finishCompletion();
}
}
//异常结果回写
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功
outcome = t;//异常信息
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // CAS改变任务最终状态:异常
//唤醒所有等待的线程
finishCompletion();
}
}
//关键方法:任务执行后,会唤醒所有等待的线程 LockSupport.unpark(thread);
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
//防止多个线程执行同一个futureTask任务对象
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行核心任务逻辑
result = c.call();
ran = true;
} catch (Throwable ex) {
//有异常
result = null;
ran = false;
//回写异常并唤醒所有等待的线程
setException(ex);
}
if (ran) //回写正常结果并唤醒所有等待的线程
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
//特殊用途,先不关注
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
//使用了简单链表来记录等待执行结果的线程
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
//移除并唤醒所有等待结果的线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//关键代码,唤醒执行通过 LockSupport.park 进入等待的线程
//与awaitDone 方法呼应
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
//等待任务完成,关键方法,
//没有计时的用 LockSupport.park(this);
//有计时的用 LockSupport.parkNanos(this, nanos);
//这里与 finishCompletion 中的 LockSupport.unpark 呼应
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//排队,通过cas追加链表节点
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//移除超时的节点
removeWaiter(q);
return state;
}
//关键代码 限时等待
LockSupport.parkNanos(this, nanos);
}
else
//关键代码 无限期等待
LockSupport.park(this);
}
}
//移除某个等待的节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;//记录 state 变量的内存地址偏移量(相对与futureTask对象起始地址)
private static final long runnerOffset;//记录 执行任务的线程 变量的内存地址偏移量
private static final long waitersOffset;//记录 等待链表的头节点 变量的内存地址偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
}
总结:
通过代码分析可以了解到
1)当某个线程调用 get()或者 get(long timeout, TimeUnit unit)方法获取结果时,如果任务还没有完成,则会通过CAS方式将调用线程加入到一个单向链表中,并且通过 LockSupport.park(this);
或者 LockSupport.parkNanos(this, nanos)实现线程自身的堵塞
2)当线程池中线程调度任务后会执行FutureTask类的run()方法,在run方法中会调用 Callable接口的call方法执行真正的任务代码,执行完成后 回写结果或异常到 outcome对象中,并且唤醒所有等待的线程,使用了 LockSupport.unpark(thread); 方法,这与 get中的 LockSupport.park 相呼应
3)为了解决线程安全问题,大量使用了CAS算法,大量应用 Unsafe 类中的CAS方法
4)示意图
今天就到这里,晚安!