sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

并发(七)线程池

Open Sayi opened this issue 7 years ago • 0 comments

本文介绍如何结构化执行任务、线程池技术和Future的设计。

任务和线程执行分离 :永远不推荐手动调用thread.start()方法去启动线程,线程的执行应该统一去管理,开发者仅仅提供任务的实现即可。

Executor

Executor是一个定义了如何提交任务的接口,任务通过Runnable接口表示。

public interface Executor {
  void execute(Runnable command);
}

我们来看看Executor的类继承关系图:

image

ExecutorService

ExecutorService继承了Executor接口,实现了更多的功能,shutdown方法可以禁止继续提交任务去执行,已经提交的任务还可以继续执行,shutdownNow方法会尝试停止已经提交的任务,invokeAll和invokeAny支持批量执行任务,invokeAny方法当有一个任务成功执行结束时会返回这个任务执行的结果。

相比于Executor提交任务的方式,ExecutorService提供了更多的方式,即一系列重载submit的方法:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

这三个方法都可以获取异步任务执行的结果,而Future就是为了获取异步结果而设计的。

image

Future有两个很重要的方法,get()方法可以阻塞当前线程,直到获取到异步任务执行结束后返回的结果,cancle(boolean)方法可以取消一个提交的任务。

设计: Runnable是一个没有返回值的任务,Callable是JDK1.5新增的有返回值的任务,在设计层面,我们可以把Runnable看成是一个返回NULL的Callable,代码统一通过Callable.call()来运行,所以需要一个适配器类将Runnable接口适配成Callable接口。JDK的Executors类提供了这样的适配方法:

public static <T> Callable<T> callable(Runnable task, T result) {
  if (task == null)
    throw new NullPointerException();
  return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
  if (task == null)
    throw new NullPointerException();
  return new RunnableAdapter<Object>(task, null);
}
static final class RunnableAdapter<T> implements Callable<T> {
  final Runnable task;
  final T result;
  RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
  }
  public T call() {
    task.run();
    return result;
  }
}

AbstractExecutorService

AbstractExecutorService是一个抽象实现,我们重点看下submit方法的源码实现:

public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  execute(ftask);
  return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task, result);
  execute(ftask);
  return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

任务task和Future返回值都封装成了同一个对象RunnableFuture,它既是一个可执行的任务,也代表了一个Future对象,获取异步任务执行的结果,继承了Runnable和Future接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {
  void run();
}

execute(ftask)方法留作子类实现,我们进入newTaskFor方法:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

newTaskFor被修饰为protected,它可以被子类重写,子类可以重写这两个方法来定义自己的RunnableFuture实现 ,FutureTask是默认实现,在ThreadPoolExecutor原理中我们会深入分析其代码。

ThreadPoolExecutor

ThreadPoolExecutor是抽象类AbstractExecutorService的实现,它实现了最重要的方法public void execute(Runnable command)

ThreadPoolExecutor支持提交一系列任务,并且支持有限个或者无限个工作线程去执行任务,这些工作线程的管理和执行我们称之为:线程池技术。如果线程池是有限的,那么多余的任务将会排队,等待被执行。

线程池里面的工作线程是可以被复用的,当某个任务来了之后可以交给已经创建好的线程执行,而不用再去创建线程。当没有任何任务的时候,这些工作线程将会被阻塞,所以排队的队列是一个阻塞队列BlockingQueue。

线程池里面的工作线程也是可以被销毁的,尤其当线程个数太多而没有继续提交任务时,销毁线程可以释放资源。

初始化

我们先来看看ThreadPoolExecutor的构造函数,它提供了线程池的一些配置参数:

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *    pool
 * @param keepAliveTime when the number of threads is greater than
 *    the core, this is the maximum time that excess idle threads
 *    will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *    executed.  This queue will hold only the {@code Runnable}
 *    tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *    creates a new thread
 * @param handler the handler to use when execution is blocked
 *    because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *     {@code corePoolSize < 0}<br>
 *     {@code keepAliveTime < 0}<br>
 *     {@code maximumPoolSize <= 0}<br>
 *     {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *     or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
              RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

理解每个参数的含义,才能真正用好线程池,才能有资格使用线程池。

  1. corePoolSize 核心工作线程数
    线程池的初始工作线程个数为0,当提交任务后,就会创建工作线程,最多创建corePoolSize个工作线程,后续提交的任务才会进入队列排队而不会去创建线程

  2. maximumPoolSize 最大工作线程数
    当排队失败,无法入队时(比如到达队列容量上限) ,就会创建最大不超过maximumPoolSize大小的工作线程,否则会被丢弃,丢弃策略见第6点。

  3. keepAliveTime & unit 额外工作线程允许空闲时间

  • 超过corePoolSize且小于maximumPoolSize这部分数量的工作线程在空闲了指定keepAliveTime时间后,将会被销毁。销毁哪些工作线程不是根据创建顺序,而是根据空闲时间,所以可能一个工作线程一直被复用,也有可能被复用后又被销毁

  • 如果一直没有任务提交,那么工作线程个数将会维持在corePoolSize个。核心数目的工作线程也可以指定空闲时间后被销毁,开关方法为:allowsCoreThreadTimeOut()

  • 如果maximumPoolSize和corePoolSize相同,并且没有允许核心数目线程可以被销毁,那么keepAliveTime将毫无意义。

  1. workQueue 任务阻塞队列
    任务排队的BlockingQueue,它可以指定队列的容量,比如new LinkedBlockingQueue<Runnable>()表示可以提交无限的任务。

  2. threadFactory 创建工作线程的工厂,下文会看到默认的实现

  3. handler 当线程个数和阻塞队列容量到达边界,无法处理任务时候的策略

  • ThreadPoolExecutor.AbortPolicy:是JDK默认策略,提交任务时将会抛出一个RejectedExecutionException异常
  • ThreadPoolExecutor.DiscardPolicy:直接被丢弃,不作任何处理
  • ThreadPoolExecutor.DiscardOldestPolicy:队列第一个元素将会被丢弃
  • ThreadPoolExecutor.CallerRunsPolicy:提交任务的线程自己执行这个任务

原理:execute()方法

我们知道初始化参数的含义,其实已经弄清楚了原理,我们直接看execute()方法源码:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 */
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  /*
   * Proceed in 3 steps:
   *
   * 1. If fewer than corePoolSize threads are running, try to
   * start a new thread with the given command as its first
   * task.  The call to addWorker atomically checks runState and
   * workerCount, and so prevents false alarms that would add
   * threads when it shouldn't, by returning false.
   *
   * 2. If a task can be successfully queued, then we still need
   * to double-check whether we should have added a thread
   * (because existing ones died since last checking) or that
   * the pool shut down since entry into this method. So we
   * recheck state and if necessary roll back the enqueuing if
   * stopped, or start a new thread if there are none.
   *
   * 3. If we cannot queue task, then we try to add a new
   * thread.  If it fails, we know we are shut down or saturated
   * and so reject the task.
   */
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  else if (!addWorker(command, false))
    reject(command);
}

ctl是一个原子整型变量,采用位操作用一个整型变量记录了线程池的运行状态runState和工作线程数量workerCount。workerCountOf(c)方法就是具体的位操作获得工作线程数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

execute()方法从注释中或者通过一个if语句和一个if-else语句可以看出,做了三步事:

  1. if (workerCountOf(c) < corePoolSize)如果当前工作线程小于核心线程,就会通过addWorker(command, true)直接创建线程执行任务,这个任务就是这个工作线程的第一个任务。
  2. 第一步当中创建工作线程失败(比如工厂创建线程失败或者因为某些异常),或者工作线程大于核心线程数,就会入队workQueue.offer(command),入队后会再次检查线程池状态,如果线程池不是运行状态,则出队且丢弃任务,如果是运行状态且当前线程池工作线程个数为0,则addWorker(null, false)创建一个空任务的工作线程。
  3. 如果入队失败,或者线程池不在运行状态,就会尝试创建工作线程,如果当前工作线程已经达到maximumPoolSize个数上限或者线程池不在运行状态就会创建失败,丢弃任务,这里判断线程池不在运行状态代码有点重复了。

接下来,我们深入addWork方法(用来创建带有第一个执行任务的工作线程):

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
         firstTask == null &&
         ! workQueue.isEmpty()))
      return false;

    for (;;) {
      int wc = workerCountOf(c);
      if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get();  // Re-read ctl
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());

        if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          workers.add(w);
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

这段代码首先会判断线程池运行状态,然后根据参数boolean core来决定是要判断是比较核心线程数还是最大工作线程数:

if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

接着会尝试将工作线程数加一compareAndIncrementWorkerCount(c),如果成功累加,则会创建线程并启动,这时候创建线程有可能会失败,则调用addWorkerFailed(w)方法。创建线程的代码封装在Worker类中,如下面这段缩减了多余代码所示:

Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();

Worker类封装了一个线程thread,通过这个线程来启动执行任务,Worker就是整个工作线程的核心,也是最难理解的一部分。

Worker工作线程

Worker类包含了第一个任务firstTask和一个线程对象thread,Worker类的thread启动后,会拿第一个任务执行,然后循环从阻塞队列中take任务执行,如果当前线程超过核心工作线程数,那么当take任务阻塞时间超过keepAliveTime后,就会返回NULL,进而销毁这个线程。

Worker类

private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  /**
   * This class will never be serialized, but we provide a
   * serialVersionUID to suppress a javac warning.
   */
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  Runnable firstTask;
  /** Per-thread task counter */
  volatile long completedTasks;

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker  */
  public void run() {
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()    { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()    { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

上面是Worker的源码,我们来一点点分解来看,我们先来看看定义:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

Worker继承了AQS类(下文会提到用法),实现了Runnable接口。接着看构造器,初始化了一个任务和使用线程工厂创建一个线程:

Worker(Runnable firstTask) {
  setState(-1); // inhibit interrupts until runWorker
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}

创建线程的任务是当前Worker对象,我们接着看Worker.run方法:

/** Delegates main run loop to outer runWorker  */
public void run() {
  runWorker(this);
}

工作线程循环执行:runWorker

其中runWorker(this)方法就是循环从队列获取任务进行执行:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      if ((runStateAtLeast(ctl.get(), STOP) ||
         (Thread.interrupted() &&
          runStateAtLeast(ctl.get(), STOP))) &&
        !wt.isInterrupted())
        wt.interrupt();
      try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}
  • runWorker方法可以看到,在线程执行前后提供了两个钩子方法:beforeExecuteafterExecute,下文会再次提到

  • Worker实现了AQS类,在任务执行前后加了不可重入的排它锁,为什么实现AQS,接着看下文中

w.lock();
...
w.unlock();

runWorker方法中从阻塞队列取任务的核心代码是:while (task != null || (task = getTask()) != null),当getTask方法返回NULL时就会退出循环,最终执行线程销毁工作processWorkerExit(w, completedAbruptly),那么什么时候返回NULL呢?

阻塞队列获取任务:getTask

我们进入getTask方法:

private Runnable getTask() {
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
      && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

getTask方法的核心作用就是取任务元素,还有一个作用就是当allowCoreThreadTimeOut==true 或者核心线程数超过corePoolSize的时候返回NULL,进入线程销毁流程。

NOTE:如果符合销毁条件,是否一个工作线程会在keepAliveTime时间内一定被销毁呢?
不一定,因为workQueue.poll或者workQueue.take都会抛出中断异常,所以他们是可中断的,中断后又会接着执行for循环重新阻塞获取任务,比如ThreadPoolExecutor.setCorePoolSize()方法就会导致工作线程中断,因为配置被更改,需要中断这些线程来更新配置。

NOTE:我们现在就可以来解释为什么Worker需要实现AQS这个问题?
原因一 是不希望正在执行任务的工作线程被中断,阻塞住的线程可以被中断,我们来看看setCorePoolSize方法调用的中断线程的代码:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

当我们更改了线程池的配置时,如果尝试获取锁成功w.tryLock(),那些阻塞住的线程会被中断(所以执行任务的前后都会加锁和释放锁),然后更新配置,重新阻塞。

原因二 是我们可以通过isLock()方法来判断哪些工作线程是在执行任务,哪些工作线程是在阻塞等待。ThreadPoolExecutor.getActiveCount()方法的源码就是这个原理。

FutureTask作为一个Task:执行

综上,我们知道了线程池如何创建线程,对任务进行排队,从阻塞队列取线程以及销毁线程的原理,通过AbstractExecutorService我们也知道,这些Runnable任务其实一个FutureTasK类型,接下来我们深入FutureTask的实现。

public class FutureTask<V> implements RunnableFuture<V> {

  public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;     // ensure visibility of callable
  }
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;     // ensure visibility of callable
  }
  public void run() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }
  // 略
}

FutureTask作为Runnable属性就是执行任务,然后将结果保存,具体方法是set(result):

protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}

set(V)方法会将当前任务的执行结果标记为COMPLETING,然后通过finishCompletion()方法唤醒那些通过Future.get获取异步结果的线程。

private void finishCompletion() {
  // assert state > COMPLETING;
  for (WaitNode q; (q = waiters) != null;) {
    if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
      for (;;) {
        Thread t = q.thread;
        if (t != null) {
          q.thread = null;
          LockSupport.unpark(t);
        }
        WaitNode next = q.next;
        if (next == null)
          break;
        q.next = null; // unlink to help gc
        q = next;
      }
      break;
    }
  }
  done();
  callable = null;    // to reduce footprint
}

waiters是一个等待队列的链表,LockSupport.unpark(t)执行了唤醒操作,接下来我们就来看看Future.get的源码。

FutureTask作为一个Future

Future.get原理

Future.get首先都会判断FutureTask的状态标记是否为COMPLETING

public V get() throws InterruptedException, ExecutionException {
  int s = state;
  if (s <= COMPLETING)
    s = awaitDone(false, 0L);
  return report(s);
}

awaitDones就是将当前线程加入等待队列的代码,这段代码相当有技巧性,通过一个for循环不断来处理每个if-else语句:

private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
  final long deadline = timed ? System.nanoTime() + nanos : 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    if (Thread.interrupted()) {
      removeWaiter(q);
      throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {
      if (q != null)
        q.thread = null;
      return s;
    }
    else if (s == COMPLETING) // cannot time out yet
      Thread.yield();
    else if (q == null)
      q = new WaitNode();
    else if (!queued)
      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                         q.next = waiters, q);
    else if (timed) {
      nanos = deadline - System.nanoTime();
      if (nanos <= 0L) {
        removeWaiter(q);
        return state;
      }
      LockSupport.parkNanos(this, nanos);
    }
    else
      LockSupport.park(this);
  }
}

首先会创建一个节点q = new WaitNode();,然后将节点加入等待队列queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);,最后都会通过LockSupport.park阻塞当前线程,直到任务执行结束或者被中断。

report(s)的代码相对简单:

private V report(int s) throws ExecutionException {
  Object x = outcome;
  if (s == NORMAL)
    return (V)x;
  if (s >= CANCELLED)
    throw new CancellationException();
  throw new ExecutionException((Throwable)x);
}

其中有个判断标记是否大于CANCELLED,这些状态有:

private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

Future.cancel原理

public boolean cancel(boolean mayInterruptIfRunning) {
  if (!(state == NEW &&
      UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
  try {  // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
      try {
        Thread t = runner;
        if (t != null)
          t.interrupt();
      } finally { // final state
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
      }
    }
  } finally {
    finishCompletion();
  }
  return true;
}

其中runner变量表示当前工作的线程,在FutureTask.run()执行中会设置。

cancel的原理就是如果mayInterruptIfRunning是true,则会中断工作线程,最终都会设置任务执行状态的标记为:INTERRUPTEDINTERRUPTING 或者 CANCELLED,这样在get时候就会如同report(s)方法所示抛出CancellationException。

问题:一个FutureTask执行了cancle(true),那么这个任务是立即终止的吗?
答案是不一定。因为即使参数是true,调用线程的t.interrupt()方法,也只是设置了中断标记而已,或者让wait、sleep等方法抛出中断异常,任务中断逻辑还是需要自己代码实现

Hook钩子

线程池提供了三个钩子,这些钩子可以用来监控和观察线程执行情况。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

ThreadFactory

JDK提供了创建工作线程工厂的默认实现:Executors.defaultThreadFactory()

static class DefaultThreadFactory implements ThreadFactory {
  private static final AtomicInteger poolNumber = new AtomicInteger(1);
  private final ThreadGroup group;
  private final AtomicInteger threadNumber = new AtomicInteger(1);
  private final String namePrefix;

  DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
           "-thread-";
  }

  public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
    if (t.isDaemon())
      t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
      t.setPriority(Thread.NORM_PRIORITY);
    return t;
  }
}

BlockingQueue排队阻塞队列:workQueue

阻塞队列通常有三种策略可以选择:

1. 无界阻塞队列UnBounded queues

通常使用LinkedBlockingQueue,当工作线程达到corePoolSize后,所有新来的任务都会进入阻塞队列,所以 无界阻塞队列导致maximumPoolSize参数无效,在使用无界队列时,我们必须考虑到无界会带来的资源衰竭内存耗尽问题。

2. 有界阻塞队列Bounded queues

可以有效防止资源耗尽,但是我们必须在队列容量和线程池的大小上作一个很好的权衡,从而提高效率。同时,我们必须考虑到线程个数和容量到达零界点时丢弃任务的策略。

3. 直接传递Direct handoffs

不保存任何元素的阻塞队列,仅仅是将任务传递给工作线程,比如SynchronousQueue。通常我们使用无界的maximumPoolSizes来创建工作线程以防止任务丢弃,所以我们必须注意过多的线程带来的系统瓶颈。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是一个计划执行任务的线程池,它继承ThreadPoolExecutor类,实现了ScheduledExecutorService接口。

ScheduledExecutorService

ScheduledExecutorService定义了一系列计划执行的方法,包括隔一段时间执行。

public interface ScheduledExecutorService extends ExecutorService {

  public ScheduledFuture<?> schedule(Runnable command,
                     long delay, TimeUnit unit);

  public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                       long delay, TimeUnit unit);

  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                          long initialDelay,
                          long period,
                          TimeUnit unit);
                          
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                           long initialDelay,
                           long delay,
                           TimeUnit unit);

}

初始化

我们直接看构造函数,它调用了ThreadPoolExecutor的构造方法,初始了一个最大线程个数为最大值,阻塞队列为DelayedWorkQueue的线程池。

public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory, handler);
}

DelayedWorkQueue是内部基于堆实现的一个可排序阻塞队列。

ScheduledFutureTask

这里所有的任务不是FutureTask,而是ScheduledFutureTask,继承于FutureTask。在《并发(六)Collections》讲DelayQueue集合以及如何实现Delay接口中已经介绍过ScheduledFutureTask了。

Executors工厂类

ThreadPoolExecutor参数这么多,可以使用工厂来构建,JDK提供了一个工具类Executors来创建线程池。

newFixedThreadPool

固定大小nThreads的线程池,使用无界阻塞队列,最多只能有nThreads个线程执行,新来的任务将会排队。

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

只有一个工作线程的线程池,使用无界阻塞队列。和newFixedThreadPool(1)类似,但是最大的不同点是newSingleThreadExecutor返回的线程池无法重新配置。

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

有需要就会创建工作线程,也会重复利用已经空闲的线程。所以线程的个数最大是Integer.MAX_VALUE,采用传递队列SynchronousQueue。线程空闲时间超过60s就会被销毁,资源被回收,直至工作线程个数为0。

通常使用newCachedThreadPool执行耗时较短的任务,使用时必须考虑线程个数增加带来的可能问题。

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}

newWorkStealingPool

工作窃取线程池ForkJoinPool,会在下一篇Fork/Join文章中介绍。

public static ExecutorService newWorkStealingPool(int parallelism) {
  return new ForkJoinPool
    (parallelism,
     ForkJoinPool.defaultForkJoinWorkerThreadFactory,
     null, true);
}

newScheduledThreadPool

计划线程池。

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

总结

深入线程池原理才能真正用好线程池,推荐使用ThreadPoolExecutor的构造器来创建线程池,因为这样才会对你想使用的线程池有更好的理解。

Sayi avatar Oct 22 '18 17:10 Sayi