sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

并发(四)锁

Open Sayi opened this issue 7 years ago • 0 comments

我们已经知道,synchronized提供了强大的同步功能,volatile提供了稍微弱一点的同步机制,我们也可以通过ThreadLocal和原子类来避免多线程问题,这篇文章将会探讨Java提供更强大的显示锁:Lock,它更加的灵活,在讲解之前,我们先来读一读关于锁的术语:

  • 共享锁、(排它锁、独占锁、互斥锁)
    共享锁表示这个锁可以被多个线程共享,后文中提到的读锁就是一种共享锁,允许多个线程读。排它锁是只有一个线程能占有锁,不允许其它线程获得锁,后文中的写锁就是一个排它锁,不允许任何其它线程读和写,synchronized也是一个排它锁。

  • 悲观锁、乐观锁
    悲观锁是总会认为并发问题会存在,所以总是会加锁,比如synchronized。而乐观锁假设并发问题不存在,而在更新时才会比较是否发生了并发问题,所以不会加锁,比如CAS机制。

  • 重入锁、非重入锁
    当前线程可以多次获得同一个锁,就是重入锁,比如synchronized。

  • 读写锁
    读写锁可以分为读锁和写锁,允许多个线程共享读锁,但是当有写锁被独占时,不允许其它任何线程占有读锁或写锁。

  • 偏向锁、自旋锁
    这涉及到synchronized底层优化的一些概念,偏向锁升级到轻量级锁等。

  • 公平锁、非公平锁
    公平锁总是唤醒最先阻塞的线程,所以它是公平的。非公平锁则不一定按照阻塞顺序来唤醒。

锁的种类大多数之间并不是互斥的关系,它们分别从各自的角度去描述这个锁的功能,在数据并发层面,还有行锁、表锁等,在分布式环境下,有分布式锁的概念

接下来,我们将深入研究Lock及其实现。

Lock、Conditon

Lock是java.util.concurrent.locks包下一个最基本的接口Condition则提供了类似wait/notify机制的await和signal,我们先来看看类图:

image

正如接口所示,lock和unlock分别是加锁和释放锁,我们必须确保使用完锁会释放锁,典型的Lock使用习惯是通过try-finally来释放锁:

Lock lock = ...;
lock.lock();
try {
  // access the resource protected by this lock
} finally {
  lock.unlock();
}

问题:为什么有了synchronized,还需要显示锁Lock呢?它们之间有什么联系和区别? 我觉得可以分以下几:

  1. 它们都创造了Happens-before关系,所以是内存一致的
  2. synchronized是排它锁,而Lock可以实现共享锁,比如下文中的ReadWriteLock
  3. synchronized是非公平锁,而Lock可以实现公平锁,可以按照阻塞顺序唤醒线程
  4. synchronized要求对多个对象加锁的顺序和多个对象释放锁的顺序相反,即小括号匹配原则,而Lock允许加锁和释放锁在不同周期内,多个锁可以交错。
  5. Lock提供四种形式的加锁方式:不可中断lock()、可中断lockInterruptibly()、尝试获取tryLock()、定尝试tryLock(long time, TimeUnit unit)
  6. Lock可以提供更灵活的实现,比如非重入锁,死锁检测等

tryLock

接下来我们看看tryLock方法,提供了尝试获取锁和超时获取锁的方式,这个返回返回一个boolean值,true表示获得了锁,false表示未获得锁,我们可以使用这个特性来解决死锁问题:两个线程各自占有锁并且阻塞在获取对方锁上。我们可以在获取对方锁的时候使用tryLock,如果不能获得,则释放自己已经拥有的锁。下面是《The Java™ Tutorials》上面的一个鞠躬问题,一个人给另一个人鞠躬,直到另一个人也鞠躬才结束,假设两个人同时鞠躬,拥有了自己的锁,它们就会在等待对方的锁产生了阻塞。

static class Friend {
  private final String name;
  private final Lock lock = new ReentrantLock();

  public Friend(String name) {
    this.name = name;
  }

  public String getName() {
    return this.name;
  }

  public boolean impendingBow(Friend bower) {
    Boolean myLock = false;
    Boolean yourLock = false;
    try {
      myLock = lock.tryLock();
      yourLock = bower.lock.tryLock();
    } finally {
      if (! (myLock && yourLock)) {
        if (myLock) {
          lock.unlock();
        }
        if (yourLock) {
          bower.lock.unlock();
        }
      }
    }
    return myLock && yourLock;
  }
    
  public void bow(Friend bower) {
    if (impendingBow(bower)) {
      try {
        System.out.format("%s: %s has"
          + " bowed to me!%n", 
          this.name, bower.getName());
        bower.bowBack(this);
      } finally {
        lock.unlock();
        bower.lock.unlock();
      }
    } else {
      System.out.format("%s: %s started"
        + " to bow to me, but saw that"
        + " I was already bowing to"
        + " him.%n",
        this.name, bower.getName());
    }
  }

  public void bowBack(Friend bower) {
    System.out.format("%s: %s has" +
      " bowed back to me!%n",
      this.name, bower.getName());
  }
}

impendingBow方法利用tryLock解决了死锁问题,从这个例子再次看到了Lock的灵活性:lock和unlock是在不同方法里调用的。

lockInterruptibly()

void lockInterruptibly() throws InterruptedException;这个方法加锁阻塞会被Thread.interrupt打断,并且抛出InterruptedException异常,它是可中断的锁。

newCondition()

Condition提供了await/signal机制,正如同Object.wait/signal必须拥有对象锁(synchronized)一样,Condition是与Lock一起使用的,lock.newCondition()就是生成与当前锁关联的Condition对象,调用condition.await()方法前当前线程必须拥有lock锁。

我们来看看Condition在生产者和消费者问题上的使用,通过notFull发池子不满的信号和notEmpty来发送池子不为空的信号,JDK提供的ArrayBlockingQueue已经实现了这个功能:

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
}

AQS:AbstractQueuedSynchronizer

Lock和Condition作为接口提供了一个很好的设计,如何实现这些接口是有难度的。

AQS编程

AbstractQueuedSynchronizer是一个实现锁和同步控制的框架,AQS是一个抽象类,实现了所有的线程阻塞和排队机制,这些实现依赖一个整型变量state,修改这个变量的值代表了同步时的获取锁和释放锁。AQS提供了protected方法供子类实现,子类通过 getState(), setState(int) 和 compareAndSetState(int, int)操作state变量的值来实现同步,我们来看这些protected方法:

/**
 * The synchronization state.
 */
private volatile int state;

// 排它锁模下尝试通过state变量获取许可
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

// 排它锁模式下通过设置变量state的值来反映释放许可
protected boolean tryRelease(int arg) {
  throw new UnsupportedOperationException();
}

// 共享锁模下尝试通过state变量获取许可,返回值-1表获取锁失败
protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}

// 共享锁模式下通过设置变量state的值来反映释放许可
protected boolean tryReleaseShared(int arg) {
  throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
  throw new UnsupportedOperationException();
}

AQS并没有实现Lock接口,它通常作为一个Lock实现类的辅助抽象类,通过AQS的实现来实现Lock接口的功能。我们来看JDK提供的一个利用AQS实现一个非重入锁的示例:

 class Mutex implements Lock, java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Reports whether in locked state
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // 如果state值为0,则获得锁
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // 释放锁,设置state值为0
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provides a Condition
     Condition newCondition() { return new ConditionObject(); }

   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

Mutex是一个非重入锁,要求state的值是0时表示可以获取锁,是1时表示锁被占有。Sync是一个实现了AQS的内部类,tryAcquire实现了从0到1的过程,tryRelease实现了从1到0的过程。Mutex实现了Lock接口,所以功能的实现都是通过Sync这个内部类对象来实现。

AbstractQueuedSynchronizer.ConditionObject是一个AQS类中实现了Condition接口的内部类,下文中会介绍更多利用AQS实现的。

AQS实现

我们已经知道,AQS通过state来代表同步状态,接下来我们将简要介绍AQS源码是如何实排队和阻塞机制,如果不关心细节,可以忽略这一节。

image

通过类图,我们可以得出很重要的一个编码原则:如果不希望一个方法被修改,我们应该用final来修饰。 AQS里面除了几个可以重写的方法外,其余都是final的。

我们来看看acquire方法:

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

如果尝试获取许可失败,则会通过addWaiter将当前线程加到FIFO等待队列的队尾:

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

其中if语句是为了尝试快速入队,如果入队失败,enq方法会通过loop循环方式接着尝试入队,我们再来看看acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

如果是队头元素,则会再次尝试tryAcquire获得许可,获取失败就会进行判断是否可以进入阻塞状态。shouldParkAfterFailedAcquire判断是否可以阻塞,parkAndCheckInterrupt就是具体阻塞方法,通过LockSuupport实现。

我们再来看看release方法:

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

private void unparkSuccessor(Node node) {
  /*
   * If status is negative (i.e., possibly needing signal) try
   * to clear in anticipation of signalling.  It is OK if this
   * fails or if status is changed by waiting thread.
   */
  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

  /*
   * Thread to unpark is held in successor, which is normally
   * just the next node.  But if cancelled or apparently null,
   * traverse backwards from tail to find the actual
   * non-cancelled successor.
   */
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);
}

释放许可会在队首取出Node节点唤醒,看似是一个公平锁的实现,其实未必,因为会在唤醒后锁被其它线程抢占。

关于AQS还有很多细节,包括waitStatus和ConditionObject,具体源码不往下看了,我们注意到这个队列的节点是Node,它是一个双端双向队列,被称为CLH队列。

ReentrantLock

ReentrantLock是基于AQS实现的一个重入锁和排它锁,同时他还实现了公平锁和非公平锁。我们来看看其中的内部抽象类Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {
  private static final long serialVersionUID = -5179523762034025860L;

  /**
   * Performs {@link Lock#lock}. The main reason for subclassing
   * is to allow fast path for nonfair version.
   */
  abstract void lock();

  /**
   * Performs non-fair tryLock.  tryAcquire is implemented in
   * subclasses, but both need nonfair try for trylock method.
   */
  final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
  }

  protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
  }

  protected final boolean isHeldExclusively() {
    // While we must in general read state before owner,
    // we don't need to do so to check if current thread is owner
    return getExclusiveOwnerThread() == Thread.currentThread();
  }

  final ConditionObject newCondition() {
    return new ConditionObject();
  }

  // 略
}

每次重入都会累加state的值,Sync实现了tryRelease(releases)方法,释放相应数目的许可,当前线程不再独占锁时,通过方法setExclusiveOwnerThread设置独占线程为空,Sync还提供了一个抽象方法lock和包内方法nonfairTryAcquire供子类使用,我们来看看具体的公平锁和非公平锁的实现:

static final class NonfairSync extends Sync {
  /**
   * Performs lock.  Try immediate barge, backing up to normal
   * acquire on failure.
   */
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

static final class FairSync extends Sync {

  final void lock() {
    acquire(1);
  }

  /**
   * Fair version of tryAcquire.  Don't grant access unless
   * recursive call or no waiters or is first.
   */
  protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
  }
}

NonfairSync实现的是非公平锁,tryAcquire方法调用的是nonfairTryAcquire方法,如果state未被线程占用,则当前线程占用锁,如果是被当前线程重入占用,则累加state值。基本的lock方法实现是调用acquire(1)方法,正如公平锁FairSync的lock方法实现那样,但是NonfairSync尝试调用nonfairTryAcquire之前,通过一个CAS操作希望当前线程抢先占用锁,即这一行代码:

if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());

FairSync就是一个公平的实现,当有许可可用,state为0时,许多线程尝试获取许可的时候,它会通过hasQueuedPredecessors()来判断是否有比当前线程等待更久的线程,如果没有,则尝试执行当前线程,hasQueuedPredecessors源码如下,当前线程和队首线程不一致则不允许抢占锁:

public final boolean hasQueuedPredecessors() {
  // The correctness of this depends on head being initialized
  // before tail and on head.next being accurate if the current
  // thread is first in queue.
  Node t = tail; // Read fields in reverse initialization order
  Node h = head;
  Node s;
  return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
}

接下来ReentrantLock的所有Lock接口方法的实现就可以依靠这些辅助同步类了:

public class ReentrantLock implements Lock, java.io.Serializable {
  private final Sync sync;
  public ReentrantLock() {
    sync = new NonfairSync();
  }
  public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
  }
  public void lock() {
    sync.lock();
  }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
  public void unlock() {
    sync.release(1);
  }
  public Condition newCondition() {
    return sync.newCondition();
  }
  public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
  }
  public final boolean isFair() {
    return sync instanceof FairSync;
  }
  // 略
}

通过源码看到,tryLock都会在非公平模式下获取锁。

ReadWriteLock 和 ReentrantReadWriteLock

ReadWriteLock拥有两个锁,一个读锁,一个写锁,如果没有写锁,读锁可以被多个线程持有,写锁是一个排它锁。

public interface ReadWriteLock {
  /**
   * Returns the lock used for reading.
   *
   * @return the lock used for reading
   */
  Lock readLock();

  /**
   * Returns the lock used for writing.
   *
   * @return the lock used for writing
   */
  Lock writeLock();
}

我们来看看JDK提供的一个使用示例:

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

我们先占有读锁去读缓存,如果无效,我们会释放读锁,占用写锁(写锁是独占锁,必须先释放读锁),操作完写之后,释放写锁,然后重新占有读锁,操作缓存数据,最后释放读锁。

ReentrantReadWriteLock是一个重入的读写锁,内部类Sync是实现了AQS的抽象类,state整型变量只能存储一个同步状态,读写锁需要两个同步状态,这里就用到了位操作,state的高16位无符号数用来记录读锁的计数,低16位无符号数用来记录写锁的计数:

/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT  = (1 << SHARED_SHIFT);
static final int MAX_COUNT    = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

我们直接看看其中一个获取读锁的方法:

final boolean tryReadLock() {
  Thread current = Thread.currentThread();
  for (;;) {
    int c = getState();
    if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
      return false;
    int r = sharedCount(c);
    if (r == MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    if (compareAndSetState(c, c + SHARED_UNIT)) {
      if (r == 0) {
        firstReader = current;
        firstReaderHoldCount = 1;
      } else if (firstReader == current) {
        firstReaderHoldCount++;
      } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          cachedHoldCounter = rh = readHolds.get();
        else if (rh.count == 0)
          readHolds.set(rh);
        rh.count++;
      }
      return true;
    }
  }
}

获取读锁前,总是会判断排它锁的计数是否为0。ReentrantReadWriteLock是通过内部维护的两个Lock对象来实现读写锁,它们都是通过Sync实现了具体功能,深入学习的可以看源码。

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;

StampedLock

JDK1.8以后还提供了 StampedLock 来支持读的场景很频繁,写的场景比较少的一种读写锁优化。

总结

本文我们学习了更加强大的Lock锁,在实现自己的锁之前,需要深入研究AQS的机制,使用好优化的锁可以增加我们程序的性能。

Sayi avatar Oct 16 '18 09:10 Sayi