JDK源码系列:Future是如何实现的?
        大家好,我们在异步编程时向线程池提交(submit)一个任务后会得到一个 Future对象,通过 future.get() 方法可以堵塞等待结果的完成,例如:
public static void main(String[] args) throws ExecutionException, InterruptedException {//准备一个futureTask//FutureTask 是一个实现了 RunnableFuture接口(Runnable+Future 接口)的类RunnableFuture<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {public Boolean call() throws Exception {System.out.println(Thread.currentThread().getId()+"do something");return true;}});//提交到一个线程上或者线程池上执行Executors.newSingleThreadExecutor().submit(futureTask);//获取执行结果futureTask.get();futureTask.get(1,TimeUnit.SECONDS);}
通过对submit方法源码的查看可以知道,无论是你提交的这个任务 是 Runnable 对象 还是Callable对象 或者是 RunnableFuture对象,都会返回一个Future对象,实际上这个对象是一个RunnableFuture对象(FutureTask)。如下:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {//返回的都是FutureTaskreturn new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {//返回的都是FutureTaskreturn new FutureTask<T>(callable);}
通过上面的分析可知,返回的Future对象的本质是FutureTask类,今天就分析下 FutureTask 类的实现原理。
任务submit(futureTask)之后会被线程池中某个线程获取执行,关于线程池的执行原理与过程可以看下老吕之前对线程池的分析文章()。
今天主要看下 futureTask.get()和futureTask.get(1,TimeUnit.SECONDS)的背后发生了什么?
FutureTask类代码阅读并注释
public class FutureTask<V> implements RunnableFuture<V> {/*** 可能的状态变迁* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*///state 代表了当前任务的状态private volatile int state;private static final int NEW = 0;//还没被调度,只有这时候可以被取消private static final int COMPLETING = 1;//已被调度,还未执行完毕private static final int NORMAL = 2;//执行完毕,并且正常private static final int EXCEPTIONAL = 3;//执行完毕,有异常private static final int CANCELLED = 4;//任务被取消private static final int INTERRUPTING = 5;//任务中断中private static final int INTERRUPTED = 6;//任务已被中断//这个就是要执行的任务private Callable<V> callable;//无论是正常结果还是异常 都会附到 outcome上private Object outcome;//用来执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads *///等待结果的线程集合(单向链表,Treiber stack 特赖贝尔 设计,//链表节点的追加是通过CAS方式进行的,可以解决 多线程并发追加的安全问题//(比如多个线程同时调用了futureTask.get() 来获取结果,都会被堵塞住,做一个排队))private volatile WaitNode waiters;//返回结果或者抛出异常("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}//构造函数public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}//构造函数public FutureTask(Runnable runnable, V result) {//将Runnable 做了一个适配,转换为 Callable对象this.callable = Executors.callable(runnable, result);this.state = NEW;}//任务是否被取消了public boolean isCancelled() {return state >= CANCELLED;}//任务是否已经开始执行public boolean isDone() {return state != NEW;}//取消任务的执行public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//任务状态为NEW并且通过CAS替换成INTERRUPTING或CANCELLED 失败//则返回失败return false;try {if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)//中断正在执行任务的线程t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//通过所有等待结果的线程finishCompletion();}return true;}//堵塞获取结果,关键方法,主要关注下 awaitDone 方法public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//如果未完成,则进入等待s = awaitDone(false, 0L);return report(s);}//带超时时间的堵塞获取结果,关键方法,主要关注下 awaitDone 方法public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}protected void done() { }//关键方法,回写正常执行结果,并唤醒所有等待的线程protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功outcome = v;//正常结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // CAS改变任务最终状态:正常完成//唤醒所有等待的线程finishCompletion();}}//异常结果回写protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功outcome = t;//异常信息UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // CAS改变任务最终状态:异常//唤醒所有等待的线程finishCompletion();}}//关键方法:任务执行后,会唤醒所有等待的线程 LockSupport.unpark(thread);public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))//防止多个线程执行同一个futureTask任务对象return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行核心任务逻辑result = c.call();ran = true;} catch (Throwable ex) {//有异常result = null;ran = false;//回写异常并唤醒所有等待的线程setException(ex);}if (ran) //回写正常结果并唤醒所有等待的线程set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}//特殊用途,先不关注protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield();}//使用了简单链表来记录等待执行结果的线程static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}//移除并唤醒所有等待结果的线程private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;//关键代码,唤醒执行通过 LockSupport.park 进入等待的线程//与awaitDone 方法呼应LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}//等待任务完成,关键方法,//没有计时的用 LockSupport.park(this);//有计时的用 LockSupport.parkNanos(this, nanos);//这里与 finishCompletion 中的 LockSupport.unpark 呼应private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)//排队,通过cas追加链表节点queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {//移除超时的节点removeWaiter(q);return state;}//关键代码 限时等待LockSupport.parkNanos(this, nanos);}else//关键代码 无限期等待LockSupport.park(this);}}//移除某个等待的节点private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;else if (pred != null) {pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;//记录 state 变量的内存地址偏移量(相对与futureTask对象起始地址)private static final long runnerOffset;//记录 执行任务的线程 变量的内存地址偏移量private static final long waitersOffset;//记录 等待链表的头节点 变量的内存地址偏移量static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}
总结:
通过代码分析可以了解到
1)当某个线程调用 get()或者 get(long timeout, TimeUnit unit)方法获取结果时,如果任务还没有完成,则会通过CAS方式将调用线程加入到一个单向链表中,并且通过 LockSupport.park(this);
或者 LockSupport.parkNanos(this, nanos)实现线程自身的堵塞
2)当线程池中线程调度任务后会执行FutureTask类的run()方法,在run方法中会调用 Callable接口的call方法执行真正的任务代码,执行完成后 回写结果或异常到 outcome对象中,并且唤醒所有等待的线程,使用了 LockSupport.unpark(thread); 方法,这与 get中的 LockSupport.park 相呼应
3)为了解决线程安全问题,大量使用了CAS算法,大量应用 Unsafe 类中的CAS方法
4)示意图
今天就到这里,晚安!
