并发(七)线程池
本文介绍如何结构化执行任务、线程池技术和Future的设计。
任务和线程执行分离 :永远不推荐手动调用thread.start()方法去启动线程,线程的执行应该统一去管理,开发者仅仅提供任务的实现即可。
Executor
Executor是一个定义了如何提交任务的接口,任务通过Runnable接口表示。
public interface Executor {
void execute(Runnable command);
}
我们来看看Executor的类继承关系图:

ExecutorService
ExecutorService继承了Executor接口,实现了更多的功能,shutdown方法可以禁止继续提交任务去执行,已经提交的任务还可以继续执行,shutdownNow方法会尝试停止已经提交的任务,invokeAll和invokeAny支持批量执行任务,invokeAny方法当有一个任务成功执行结束时会返回这个任务执行的结果。
相比于Executor提交任务的方式,ExecutorService提供了更多的方式,即一系列重载submit的方法:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
这三个方法都可以获取异步任务执行的结果,而Future就是为了获取异步结果而设计的。

Future有两个很重要的方法,get()方法可以阻塞当前线程,直到获取到异步任务执行结束后返回的结果,cancle(boolean)方法可以取消一个提交的任务。
设计: Runnable是一个没有返回值的任务,Callable是JDK1.5新增的有返回值的任务,在设计层面,我们可以把Runnable看成是一个返回NULL的Callable,代码统一通过Callable.call()来运行,所以需要一个适配器类将Runnable接口适配成Callable接口。JDK的Executors类提供了这样的适配方法:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
AbstractExecutorService
AbstractExecutorService是一个抽象实现,我们重点看下submit方法的源码实现:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
任务task和Future返回值都封装成了同一个对象RunnableFuture,它既是一个可执行的任务,也代表了一个Future对象,获取异步任务执行的结果,继承了Runnable和Future接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
execute(ftask)方法留作子类实现,我们进入newTaskFor方法:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
newTaskFor被修饰为protected,它可以被子类重写,子类可以重写这两个方法来定义自己的RunnableFuture实现 ,FutureTask是默认实现,在ThreadPoolExecutor原理中我们会深入分析其代码。
ThreadPoolExecutor
ThreadPoolExecutor是抽象类AbstractExecutorService的实现,它实现了最重要的方法public void execute(Runnable command)。
ThreadPoolExecutor支持提交一系列任务,并且支持有限个或者无限个工作线程去执行任务,这些工作线程的管理和执行我们称之为:线程池技术。如果线程池是有限的,那么多余的任务将会排队,等待被执行。
线程池里面的工作线程是可以被复用的,当某个任务来了之后可以交给已经创建好的线程执行,而不用再去创建线程。当没有任何任务的时候,这些工作线程将会被阻塞,所以排队的队列是一个阻塞队列BlockingQueue。
线程池里面的工作线程也是可以被销毁的,尤其当线程个数太多而没有继续提交任务时,销毁线程可以释放资源。
初始化
我们先来看看ThreadPoolExecutor的构造函数,它提供了线程池的一些配置参数:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
理解每个参数的含义,才能真正用好线程池,才能有资格使用线程池。
-
corePoolSize 核心工作线程数
线程池的初始工作线程个数为0,当提交任务后,就会创建工作线程,最多创建corePoolSize个工作线程,后续提交的任务才会进入队列排队而不会去创建线程。 -
maximumPoolSize 最大工作线程数
当排队失败,无法入队时(比如到达队列容量上限) ,就会创建最大不超过maximumPoolSize大小的工作线程,否则会被丢弃,丢弃策略见第6点。 -
keepAliveTime & unit 额外工作线程允许空闲时间
-
超过corePoolSize且小于maximumPoolSize这部分数量的工作线程在空闲了指定keepAliveTime时间后,将会被销毁。销毁哪些工作线程不是根据创建顺序,而是根据空闲时间,所以可能一个工作线程一直被复用,也有可能被复用后又被销毁。
-
如果一直没有任务提交,那么工作线程个数将会维持在corePoolSize个。核心数目的工作线程也可以指定空闲时间后被销毁,开关方法为:allowsCoreThreadTimeOut()
-
如果maximumPoolSize和corePoolSize相同,并且没有允许核心数目线程可以被销毁,那么keepAliveTime将毫无意义。
-
workQueue 任务阻塞队列
任务排队的BlockingQueue,它可以指定队列的容量,比如new LinkedBlockingQueue<Runnable>()表示可以提交无限的任务。 -
threadFactory 创建工作线程的工厂,下文会看到默认的实现
-
handler 当线程个数和阻塞队列容量到达边界,无法处理任务时候的策略
- ThreadPoolExecutor.AbortPolicy:是JDK默认策略,提交任务时将会抛出一个RejectedExecutionException异常
- ThreadPoolExecutor.DiscardPolicy:直接被丢弃,不作任何处理
- ThreadPoolExecutor.DiscardOldestPolicy:队列第一个元素将会被丢弃
- ThreadPoolExecutor.CallerRunsPolicy:提交任务的线程自己执行这个任务
原理:execute()方法
我们知道初始化参数的含义,其实已经弄清楚了原理,我们直接看execute()方法源码:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ctl是一个原子整型变量,采用位操作用一个整型变量记录了线程池的运行状态runState和工作线程数量workerCount。workerCountOf(c)方法就是具体的位操作获得工作线程数:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
execute()方法从注释中或者通过一个if语句和一个if-else语句可以看出,做了三步事:
if (workerCountOf(c) < corePoolSize)如果当前工作线程小于核心线程,就会通过addWorker(command, true)直接创建线程执行任务,这个任务就是这个工作线程的第一个任务。- 第一步当中创建工作线程失败(比如工厂创建线程失败或者因为某些异常),或者工作线程大于核心线程数,就会入队
workQueue.offer(command),入队后会再次检查线程池状态,如果线程池不是运行状态,则出队且丢弃任务,如果是运行状态且当前线程池工作线程个数为0,则addWorker(null, false)创建一个空任务的工作线程。 - 如果入队失败,或者线程池不在运行状态,就会尝试创建工作线程,如果当前工作线程已经达到maximumPoolSize个数上限或者线程池不在运行状态就会创建失败,丢弃任务,这里判断线程池不在运行状态代码有点重复了。
接下来,我们深入addWork方法(用来创建带有第一个执行任务的工作线程):
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这段代码首先会判断线程池运行状态,然后根据参数boolean core来决定是要判断是比较核心线程数还是最大工作线程数:
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
接着会尝试将工作线程数加一compareAndIncrementWorkerCount(c),如果成功累加,则会创建线程并启动,这时候创建线程有可能会失败,则调用addWorkerFailed(w)方法。创建线程的代码封装在Worker类中,如下面这段缩减了多余代码所示:
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
Worker类封装了一个线程thread,通过这个线程来启动执行任务,Worker就是整个工作线程的核心,也是最难理解的一部分。
Worker工作线程
Worker类包含了第一个任务firstTask和一个线程对象thread,Worker类的thread启动后,会拿第一个任务执行,然后循环从阻塞队列中take任务执行,如果当前线程超过核心工作线程数,那么当take任务阻塞时间超过keepAliveTime后,就会返回NULL,进而销毁这个线程。
Worker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
上面是Worker的源码,我们来一点点分解来看,我们先来看看定义:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker继承了AQS类(下文会提到用法),实现了Runnable接口。接着看构造器,初始化了一个任务和使用线程工厂创建一个线程:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
创建线程的任务是当前Worker对象,我们接着看Worker.run方法:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
工作线程循环执行:runWorker
其中runWorker(this)方法就是循环从队列获取任务进行执行:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
-
runWorker方法可以看到,在线程执行前后提供了两个钩子方法:
beforeExecute和afterExecute,下文会再次提到 -
Worker实现了AQS类,在任务执行前后加了不可重入的排它锁,为什么实现AQS,接着看下文中
w.lock();
...
w.unlock();
runWorker方法中从阻塞队列取任务的核心代码是:while (task != null || (task = getTask()) != null),当getTask方法返回NULL时就会退出循环,最终执行线程销毁工作processWorkerExit(w, completedAbruptly),那么什么时候返回NULL呢?
阻塞队列获取任务:getTask
我们进入getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask方法的核心作用就是取任务元素,还有一个作用就是当allowCoreThreadTimeOut==true 或者核心线程数超过corePoolSize的时候返回NULL,进入线程销毁流程。
NOTE:如果符合销毁条件,是否一个工作线程会在keepAliveTime时间内一定被销毁呢?
不一定,因为workQueue.poll或者workQueue.take都会抛出中断异常,所以他们是可中断的,中断后又会接着执行for循环重新阻塞获取任务,比如ThreadPoolExecutor.setCorePoolSize()方法就会导致工作线程中断,因为配置被更改,需要中断这些线程来更新配置。
NOTE:我们现在就可以来解释为什么Worker需要实现AQS这个问题?
原因一 是不希望正在执行任务的工作线程被中断,阻塞住的线程可以被中断,我们来看看setCorePoolSize方法调用的中断线程的代码:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
当我们更改了线程池的配置时,如果尝试获取锁成功w.tryLock(),那些阻塞住的线程会被中断(所以执行任务的前后都会加锁和释放锁),然后更新配置,重新阻塞。
原因二 是我们可以通过isLock()方法来判断哪些工作线程是在执行任务,哪些工作线程是在阻塞等待。ThreadPoolExecutor.getActiveCount()方法的源码就是这个原理。
FutureTask作为一个Task:执行
综上,我们知道了线程池如何创建线程,对任务进行排队,从阻塞队列取线程以及销毁线程的原理,通过AbstractExecutorService我们也知道,这些Runnable任务其实一个FutureTasK类型,接下来我们深入FutureTask的实现。
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 略
}
FutureTask作为Runnable属性就是执行任务,然后将结果保存,具体方法是set(result):
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
set(V)方法会将当前任务的执行结果标记为COMPLETING,然后通过finishCompletion()方法唤醒那些通过Future.get获取异步结果的线程。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
waiters是一个等待队列的链表,LockSupport.unpark(t)执行了唤醒操作,接下来我们就来看看Future.get的源码。
FutureTask作为一个Future
Future.get原理
Future.get首先都会判断FutureTask的状态标记是否为COMPLETING。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDones就是将当前线程加入等待队列的代码,这段代码相当有技巧性,通过一个for循环不断来处理每个if-else语句:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
首先会创建一个节点q = new WaitNode();,然后将节点加入等待队列queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);,最后都会通过LockSupport.park阻塞当前线程,直到任务执行结束或者被中断。
report(s)的代码相对简单:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
其中有个判断标记是否大于CANCELLED,这些状态有:
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
Future.cancel原理
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
其中runner变量表示当前工作的线程,在FutureTask.run()执行中会设置。
cancel的原理就是如果mayInterruptIfRunning是true,则会中断工作线程,最终都会设置任务执行状态的标记为:INTERRUPTED 、INTERRUPTING 或者 CANCELLED,这样在get时候就会如同report(s)方法所示抛出CancellationException。
问题:一个FutureTask执行了cancle(true),那么这个任务是立即终止的吗?
答案是不一定。因为即使参数是true,调用线程的t.interrupt()方法,也只是设置了中断标记而已,或者让wait、sleep等方法抛出中断异常,任务中断逻辑还是需要自己代码实现。
Hook钩子
线程池提供了三个钩子,这些钩子可以用来监控和观察线程执行情况。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
ThreadFactory
JDK提供了创建工作线程工厂的默认实现:Executors.defaultThreadFactory():
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
BlockingQueue排队阻塞队列:workQueue
阻塞队列通常有三种策略可以选择:
1. 无界阻塞队列UnBounded queues
通常使用LinkedBlockingQueue,当工作线程达到corePoolSize后,所有新来的任务都会进入阻塞队列,所以 无界阻塞队列导致maximumPoolSize参数无效,在使用无界队列时,我们必须考虑到无界会带来的资源衰竭内存耗尽问题。
2. 有界阻塞队列Bounded queues
可以有效防止资源耗尽,但是我们必须在队列容量和线程池的大小上作一个很好的权衡,从而提高效率。同时,我们必须考虑到线程个数和容量到达零界点时丢弃任务的策略。
3. 直接传递Direct handoffs
不保存任何元素的阻塞队列,仅仅是将任务传递给工作线程,比如SynchronousQueue。通常我们使用无界的maximumPoolSizes来创建工作线程以防止任务丢弃,所以我们必须注意过多的线程带来的系统瓶颈。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是一个计划执行任务的线程池,它继承ThreadPoolExecutor类,实现了ScheduledExecutorService接口。
ScheduledExecutorService
ScheduledExecutorService定义了一系列计划执行的方法,包括隔一段时间执行。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
初始化
我们直接看构造函数,它调用了ThreadPoolExecutor的构造方法,初始了一个最大线程个数为最大值,阻塞队列为DelayedWorkQueue的线程池。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
DelayedWorkQueue是内部基于堆实现的一个可排序阻塞队列。
ScheduledFutureTask
这里所有的任务不是FutureTask,而是ScheduledFutureTask,继承于FutureTask。在《并发(六)Collections》讲DelayQueue集合以及如何实现Delay接口中已经介绍过ScheduledFutureTask了。
Executors工厂类
ThreadPoolExecutor参数这么多,可以使用工厂来构建,JDK提供了一个工具类Executors来创建线程池。
newFixedThreadPool
固定大小nThreads的线程池,使用无界阻塞队列,最多只能有nThreads个线程执行,新来的任务将会排队。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor
只有一个工作线程的线程池,使用无界阻塞队列。和newFixedThreadPool(1)类似,但是最大的不同点是newSingleThreadExecutor返回的线程池无法重新配置。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
有需要就会创建工作线程,也会重复利用已经空闲的线程。所以线程的个数最大是Integer.MAX_VALUE,采用传递队列SynchronousQueue。线程空闲时间超过60s就会被销毁,资源被回收,直至工作线程个数为0。
通常使用newCachedThreadPool执行耗时较短的任务,使用时必须考虑线程个数增加带来的可能问题。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newWorkStealingPool
工作窃取线程池ForkJoinPool,会在下一篇Fork/Join文章中介绍。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newScheduledThreadPool
计划线程池。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
总结
深入线程池原理才能真正用好线程池,推荐使用ThreadPoolExecutor的构造器来创建线程池,因为这样才会对你想使用的线程池有更好的理解。