你一定会需要的FutureTask在线程池中应用和源码解析

你一定会需要的FutureTask在线程池中应用和源码解析

FutureTask 是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。

主要实现分三部分:

  1. 封装 Callable,然后放到线程池中去异步执行->run。
  2. 获取结果-> get。
  3. 取消任务-> cancel。

接下来主要学习下该模型如何实现。

举例说明FutureTask在线程池中的应用

// 第一步,定义线程池,
ExecutorService executor = new ThreadPoolExecutor(
 minPoolSize,
 maxPollSize,
 keepAliveTime,
 TimeUnit.SECONDS,
 new SynchronousQueue<>());
 
// 第二步,放到线程池中执行,返回FutureTask
FutureTask task = executor.submit(callable);
 
// 第三步,获取返回值
T data = task.get();

学习FutureTask实现

类属性

//以下是FutureTask的各种状态
private volatile int state; 
private static final int NEW = 0; 
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
 
private Callable<V> callable; //执行的任务
private Object outcome; //存储结果或者异常
private volatile Thread runner;//执行callable的线程
private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈

其中各种状态存在 最终状态 status>COMPLETING

1)NEW -> COMPLETING -> NORMAL(有正常结果)

2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常)

3) NEW -> CANCELLED(无结果)

4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)

类方法

从上面举例说明开始分析。

run()方法

FutureTask 继承 Runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。

public void run() {
 //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回
 if (state != NEW ||
 !UNSAFE.compareAndSwapObject(this, runnerOffset,
 null, Thread.currentThread()))
 return;
 try {
 Callable<V> c = callable;
 if (c != null && state == NEW) {
 V result;
 boolean ran;
 try {
 // 执行callable任务 这里对异常进行了catch
 result = c.call();
 ran = true;
 } catch (Throwable ex) {
 result = null;
 ran = false;
 setException(ex); // 封装异常到outcome
 }
 if (ran)
 set(result);
 }
 } finally {
 runner = null;
 int s = state;
 // 这里如果是中断中,设置成最终状态
 if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
 }
}

以上是 run 方法源码实现很简单,解析如下:

  1. 如果不是始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。
  2. 执行 Callable 的 call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到 outcome 中)。
  3. 如果成功执行 set 方法,封装结果。

set 方法

protected void set(V v) {
 //cas方式设置成completing状态,防止多个线程同时处理
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 outcome = v; // 封装结果
 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态
 
 finishCompletion();
 }
}

解析如下:

  1. cas方式设置成completing状态,防止多个线程同时处理
  2. 封装结果到outcome,然后设置到最终状态normal
  3. 执行finishCompletion方法。

finishCompletion方法

// state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程
private void finishCompletion() {
 // assert state > COMPLETING;
 for (WaitNode q; (q = waiters) != null;) {
 // cas 设置waiters为null,防止多个线程执行。
 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 // 循环唤醒所有等待结果的线程
 for (;;) {
 Thread t = q.thread;
 if (t != null) {
 q.thread = null;
 //唤醒线程
 LockSupport.unpark(t);
 }
 WaitNode next = q.next;
 if (next == null)
 break;
 q.next = null; // unlink to help gc
 q = next;
 }
 break;
 }
 }
 //该方法为空,可以被重写
 done();
 callable = null; // to reduce footprint
}

解析如下:

遍历waiters中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。

以上就是执行的过程,接下来分析获取结果的过程->get。

get 方法

public V get() throws InterruptedException, ExecutionException {
 int s = state;
 if (s <= COMPLETING)
 s = awaitDone(false, 0L);
 return report(s);
}
public V get(long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException {
 if (unit == null)
 throw new NullPointerException();
 int s = state;
 if (s <= COMPLETING &&
 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
 throw new TimeoutException();
 return report(s);
 }

解析如下:

以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。

状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功、异常、取消)。

调用 awaitDone 方法阻塞线程,最终调用 report 方法返回结果。

awaitDone 方法

private int awaitDone(boolean timed, long nanos)
 throws InterruptedException {
 final long deadline = timed ? System.nanoTime() + nanos : 0L;
 WaitNode q = null;
 boolean queued = false;
 for (;;) {
 //线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常
 if (Thread.interrupted()) {
 removeWaiter(q);
 throw new InterruptedException();
 }
 int s = state;
 // 如果已经是最终状态,退出返回
 if (s > COMPLETING) {
 if (q != null)
 q.thread = null;
 return s;
 }
 //这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。
 else if (s == COMPLETING) // cannot time out yet
 Thread.yield();
 // 初始化该阻塞节点
 else if (q == null)
 q = new WaitNode();
 // cas方式写到阻塞waiters栈中
 else if (!queued)
 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
 q.next = waiters, q);
 // 这里做阻塞时间处理。
 else if (timed) {
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
 removeWaiter(q);
 return state;
 }
 // 阻塞线程,有超时时间
 LockSupport.parkNanos(this, nanos);
 }
 else
 // 阻塞线程
 LockSupport.park(this);
 }
 }

解析如下:

整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。

然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

report 方法

private V report(int s) throws ExecutionException {
 Object x = outcome;
 if (s == NORMAL)
 return (V)x;
 if (s >= CANCELLED)
 throw new CancellationException();
 throw new ExecutionException((Throwable)x);
}

然后是report方法,如果是正常结束,返回结果,如果不是正常结束(取消,中断)抛出异常。

最后分析下取消流程。

cancel 方法

public boolean cancel(boolean mayInterruptIfRunning) {
 if (!(state == NEW &&
 UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 return false;
 try { // in case call to interrupt throws exception
 if (mayInterruptIfRunning) {
 try {
 Thread t = runner;
 if (t != null)
 t.interrupt();
 } finally { // final state
 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
 }
 }
 } finally {
 finishCompletion();
 }
 return true;
}

解析如下:

mayInterruptIfRunning参数是是否允许运行中被中断取消。

  1. 根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。
  2. 如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED。
  3. 唤醒所有在get()方法等待的线程。

此处有两种状态转换:

  1. 如果mayInterruptIfRunning为true:status状态转换为 new -> INTERRUPTING->INTERRUPTED。主动去中断执行线程,然后唤醒所有等待结果的线程。
  2. 如果mayInterruptIfRunning为false:status状态转换为 new -> CANCELLED。

不会去中断执行线程,直接唤醒所有等待结果的线程,从 awaitDone 方法中可以看到,唤醒等待线程后,直接从跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。

总结

以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 就是一个实现 Future 模式,支持取消的异步处理器。

相关推荐