sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

并发(六)Collections

Open Sayi opened this issue 7 years ago • 0 comments

本章重点介绍线程安全的集合类,它们都在java.util.concurrent包下。

阻塞队列:BlockingQueue、BlockingDeque

BlockingQueue阻塞队列相对于普通队列来说,提供了额外的操作,:

  1. 在队列为空时,take()获取元素的时候会阻塞直到这个队列不为空
  2. 在队列满时,put()存储元素的时候会阻塞直到这个队列有剩余的空间 关于队列,参见以前的一篇文章《Collections(四)Queue》

BlockingQueue阻塞队列常用于生产者-消费者模型,比如Google的Volley框架,使用阻塞队列缓存网络请求,使用若干线程从阻塞队列获取网络请求执行,又比如线程池,如果超过了coreSize,就会通过阻塞队列对任务进行排队,工作线程或者新启动线程从阻塞队列取出任务执行。下面的代码是JDK提供的一个示例,生产者进行put操作,消费者执行take操作:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
}

class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
}

接口BlockingDeque继承了接口BlockingQueue,是一个阻塞双端队列。

ArrayBlockingQueue

数组实现的一个有界阻塞队列。它提供的构造函数中,必须指定队列的大小,其中fair表示公平策略:

public ArrayBlockingQueue(int capacity) {
  this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

在《锁》的章节中讨论过使用Lock和Condition实现生产者和消费者模型,通过notEmpty发送队列已经不为空的信号,通过notFull发送队列还未满的信号,我们来看看ArrayBlockingQueue中的take一个元素的核心源码:

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0)
      notEmpty.await();
    return dequeue();
  } finally {
    lock.unlock();
  }
}
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  notFull.signal();
  return x;
}

通过当队列为空时,通过notEmpty.await()阻塞当前线程,每次dequeue出队一个元素,都会发送队列未满的信号:notFull.signal();,注意到await必须使用while守护循环代码块。

我们再来看看put入队的源码:

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == items.length)
      notFull.await();
    enqueue(e);
  } finally {
    lock.unlock();
  }
}
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal();
}

当队列满时,通过while循坏守护notFull.await();方法阻塞当前线程,每次入队都会发送队列不为空的信号。

LinkedBlockingQueue

基于链表实现的一个可选有界阻塞队列,队列大小可以通过构造函数指定,默认大小为Integer.MAX_VALUE:

public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

Note:和ArrayBlockingQueue只使用一个锁实现take和put不一样,LinkedBlockingQueue使用了双锁,提高了吞吐量,编写代码实现双锁还是需要谨慎的。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

因为使用双锁,在多线程干涉情况下,队列元素个数的统计可能不正确,所以采用了原子类型存储个数:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

接下来我们就可以看看take()的源码了:

public E take() throws InterruptedException {
  E x;
  int c = -1;
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lockInterruptibly();
  try {
    while (count.get() == 0) {
      notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
      notEmpty.signal();
  } finally {
    takeLock.unlock();
  }
  if (c == capacity)
    signalNotFull();
  return x;
}
private void signalNotFull() {
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
    notFull.signal();
  } finally {
    putLock.unlock();
  }
}

这段代码和ArrayBlockingQueue的实现也有不一样的地方:

if (c > 1)
  notEmpty.signal();

这段代码的意思是如果在出队前的元素超过1个,那么出队后还是有剩余元素的,则唤醒下一个等待take的线程,所以这里唤醒下一个线程获取元素没有等待通过put方法调用notEmpty.signal();,而是提前判断来唤醒。

put()方法源码:

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  // Note: convention in all put/take/etc is to preset local var
  // holding count negative to indicate failure unless set.
  int c = -1;
  Node<E> node = new Node<E>(e);
  final ReentrantLock putLock = this.putLock;
  final AtomicInteger count = this.count;
  putLock.lockInterruptibly();
  try {
    /*
     * Note that count is used in wait guard even though it is
     * not protected by lock. This works because count can
     * only decrease at this point (all other puts are shut
     * out by lock), and we (or some other waiting put) are
     * signalled if it ever changes from capacity. Similarly
     * for all other uses of count in other wait guards.
     */
    while (count.get() == capacity) {
      notFull.await();
    }
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
      notFull.signal();
  } finally {
    putLock.unlock();
  }
  if (c == 0)
    signalNotEmpty();
}

LinkedBlockingQueue采用双锁和链表实现,在性能上有所提高,它应该是阻塞队列的首选。

PriorityBlockingQueue

基于堆实现的无界阻塞队列,和PriorityQueue实现机制类似。因为是无界的,所以当take元素的时候无需发送notFull信号,具体参考源码。

Volley框架中采用PriorityBlockingQueue来存储网络请求,高优先级的网络请求会被优先处理。

SynchronousQueue

这是一个不会存储任何一个元素的队列,它的容量一直为0,也没有任何变量保存元素。它作为一个传递作用:将元素从put线程传递到take线程。

  • take()获取元素的时候会阻塞,直到调用了put()方法
  • put()元素的时候会阻塞,直到调用了take()方法

我们来看看它的核心代码:

public E take() throws InterruptedException {
  E e = transferer.transfer(null, false, 0);
  if (e != null)
    return e;
  Thread.interrupted();
  throw new InterruptedException();
}
public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  if (transferer.transfer(e, false, 0) == null) {
    Thread.interrupted();
    throw new InterruptedException();
  }
}

这里都使用一个关键对象Transferer,它是整个传递的核心。

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null,
     *         the operation failed due to timeout or interrupt --
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue内部提供了TransferQueue来实现公平模式,TransferStack实现非公平模式,具体源码就不拓展了,使用了Dual Queue和Dual Stack算法。

DelayQueue

DelayQueue是根据延迟时间排列元素的优先级阻塞队列,内部通过PriorityQueue实现优先级队列。当获取元素时,如果队首元素的延迟时间未到,则会阻塞直到队首元素延迟时间过期。DelayQueue通常用在需要过延迟一段时间才操作的场景,将这些操作放进延迟队列,然后线程从延迟队列取元素执行操作。

延迟队列的元素必须是延迟的且可比较,实现java.util.concurrent.Delayed接口。

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {

  /**
   * Returns the remaining delay associated with this object, in the
   * given time unit.
   *
   * @param unit the time unit
   * @return the remaining delay; zero or negative values indicate
   * that the delay has already elapsed
   */
  long getDelay(TimeUnit unit);
}

如何实现Delayed接口并不是非常简单的事情,compareTo方法需要和getDelay方法相结合使用,这样才能确保延迟最短的元素优先级最高,最先被线程取出来执行。我们来看看ScheduledThreadPoolExecutor.ScheduledFutureTask<V>的源码,它表示一个计划的任务,实现了Delay接口:

private class ScheduledFutureTask<V>
    extends FutureTask<V> implements RunnableScheduledFuture<V> {
  /** Sequence number to break ties FIFO */
  private final long sequenceNumber;

  /** The time the task is enabled to execute in nanoTime units */
  private long time;

   ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.sequenceNumber = sequencer.getAndIncrement();
  }
  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }
  // 略
}

这里是一个典型Delay接口的实现方式,初始化的时候传入延迟过期的时间点,getDelay方法将这个时间点与当前时间作比较计算出剩余延迟时间,compareTo方法通过比较延迟时间点或者通过getDelay方法比较来确保延迟最短的任务排在队列前面。sequenceNumber字段是用来当延迟时间一致时,按照先来后到的序列号顺序排队。

知道了如何实现Delay接口,我们就可以深入DelayQueue的源码:

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    for (;;) {
      E first = q.peek();
      if (first == null)
        available.await();
      else {
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0)
          return q.poll();
        first = null; // don't retain ref while waiting
        if (leader != null)
          available.await();
        else {
          Thread thisThread = Thread.currentThread();
          leader = thisThread;
          try {
            available.awaitNanos(delay);
          } finally {
            if (leader == thisThread)
              leader = null;
          }
        }
      }
    }
  } finally {
    if (leader == null && q.peek() != null)
      available.signal();
    lock.unlock();
  }
}

细节不再赘述,我们可以了解到几下几点:

  • 当队首元素还未过期时,线程会被阻塞,通过available.awaitNanos(delay);方法阻塞响应延迟时间,延迟时间到了,(多个阻塞线程其中一个)则会取出队首元素
  • 如果队列的元素已经过期了还没有线程来取,当下一个线程来取的时候,直接会取出队首已经过期的元素
long delay = first.getDelay(NANOSECONDS);
    if (delay <= 0)
      return q.poll();

NOTE:DelayQueue还可以用来设计缓存的有效期,缓存时间就是延迟时间。

LinkedTransferQueue

传输作用的阻塞队列,这里不赘述。

LinkedBlockingDeque

基于链表实现的可选有界双端阻塞队列,实现机制很简单,具体参考源码。

同步集合

Java集合框架还提供一些集合的并发实现,它们永远不会抛出ConcurrentModificationException异常,与Collections.synchronizedXXX方法简单的使用synchronized同步机制不同,ConcurrentXXX是并发的,允许多个线程同时操作,CAS在这些并发实现中起到了非常重要的作用。

有一些并发实现是基于跳表SkipList实现的,是树的一个替代实现。

有一些并发实现是基于CopyOnWrite实现的,当需要写时,会拷贝一个新数组进行写,一切操作结束后,会将新数组赋值给旧数组。我们来看看CopyOnWriteArrayList.add(E)的实现:

public boolean add(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    Object[] elements = getArray();
    int len = elements.length;
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    newElements[len] = e;
    setArray(newElements);
    return true;
  } finally {
    lock.unlock();
  }
}

Arrays.copyOf实现了新数组的复制,setArray实现了操作结束赋值的操作。

  • ConcurrentLinkedDeque LinkedDeque的并发实现。

  • ConcurrentLinkedQueue LinkedQueue的并发实现。

  • ConcurrentHashMap HashMap的并发实现

  • ConcurrentSkipListMap TreeMap的并发实现,基于跳表算法。

  • ConcurrentSkipListSet TreeSet的并发实现。

  • CopyOnWriteArrayList 写时复制的ArrayList并发实现。

  • CopyOnWriteArraySet 写时复制的Set集合。

总结

对于一些无界阻塞队列的使用,我们要倍加小心,因为可能会导致内存不足。

Sayi avatar Oct 18 '18 07:10 Sayi