并发(四)锁
我们已经知道,synchronized提供了强大的同步功能,volatile提供了稍微弱一点的同步机制,我们也可以通过ThreadLocal和原子类来避免多线程问题,这篇文章将会探讨Java提供更强大的显示锁:Lock,它更加的灵活,在讲解之前,我们先来读一读关于锁的术语:
-
共享锁、(排它锁、独占锁、互斥锁)
共享锁表示这个锁可以被多个线程共享,后文中提到的读锁就是一种共享锁,允许多个线程读。排它锁是只有一个线程能占有锁,不允许其它线程获得锁,后文中的写锁就是一个排它锁,不允许任何其它线程读和写,synchronized也是一个排它锁。 -
悲观锁、乐观锁
悲观锁是总会认为并发问题会存在,所以总是会加锁,比如synchronized。而乐观锁假设并发问题不存在,而在更新时才会比较是否发生了并发问题,所以不会加锁,比如CAS机制。 -
重入锁、非重入锁
当前线程可以多次获得同一个锁,就是重入锁,比如synchronized。 -
读写锁
读写锁可以分为读锁和写锁,允许多个线程共享读锁,但是当有写锁被独占时,不允许其它任何线程占有读锁或写锁。 -
偏向锁、自旋锁
这涉及到synchronized底层优化的一些概念,偏向锁升级到轻量级锁等。 -
公平锁、非公平锁
公平锁总是唤醒最先阻塞的线程,所以它是公平的。非公平锁则不一定按照阻塞顺序来唤醒。
锁的种类大多数之间并不是互斥的关系,它们分别从各自的角度去描述这个锁的功能,在数据并发层面,还有行锁、表锁等,在分布式环境下,有分布式锁的概念
接下来,我们将深入研究Lock及其实现。
Lock、Conditon
Lock是java.util.concurrent.locks包下一个最基本的接口Condition则提供了类似wait/notify机制的await和signal,我们先来看看类图:

正如接口所示,lock和unlock分别是加锁和释放锁,我们必须确保使用完锁会释放锁,典型的Lock使用习惯是通过try-finally来释放锁:
Lock lock = ...;
lock.lock();
try {
// access the resource protected by this lock
} finally {
lock.unlock();
}
问题:为什么有了synchronized,还需要显示锁Lock呢?它们之间有什么联系和区别? 我觉得可以分以下几:
- 它们都创造了Happens-before关系,所以是内存一致的
- synchronized是排它锁,而Lock可以实现共享锁,比如下文中的ReadWriteLock
- synchronized是非公平锁,而Lock可以实现公平锁,可以按照阻塞顺序唤醒线程
- synchronized要求对多个对象加锁的顺序和多个对象释放锁的顺序相反,即小括号匹配原则,而Lock允许加锁和释放锁在不同周期内,多个锁可以交错。
- Lock提供四种形式的加锁方式:不可中断lock()、可中断lockInterruptibly()、尝试获取tryLock()、定尝试tryLock(long time, TimeUnit unit)
- Lock可以提供更灵活的实现,比如非重入锁,死锁检测等
tryLock
接下来我们看看tryLock方法,提供了尝试获取锁和超时获取锁的方式,这个返回返回一个boolean值,true表示获得了锁,false表示未获得锁,我们可以使用这个特性来解决死锁问题:两个线程各自占有锁并且阻塞在获取对方锁上。我们可以在获取对方锁的时候使用tryLock,如果不能获得,则释放自己已经拥有的锁。下面是《The Java™ Tutorials》上面的一个鞠躬问题,一个人给另一个人鞠躬,直到另一个人也鞠躬才结束,假设两个人同时鞠躬,拥有了自己的锁,它们就会在等待对方的锁产生了阻塞。
static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public boolean impendingBow(Friend bower) {
Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) {
if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
}
}
return myLock && yourLock;
}
public void bow(Friend bower) {
if (impendingBow(bower)) {
try {
System.out.format("%s: %s has"
+ " bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
} finally {
lock.unlock();
bower.lock.unlock();
}
} else {
System.out.format("%s: %s started"
+ " to bow to me, but saw that"
+ " I was already bowing to"
+ " him.%n",
this.name, bower.getName());
}
}
public void bowBack(Friend bower) {
System.out.format("%s: %s has" +
" bowed back to me!%n",
this.name, bower.getName());
}
}
impendingBow方法利用tryLock解决了死锁问题,从这个例子再次看到了Lock的灵活性:lock和unlock是在不同方法里调用的。
lockInterruptibly()
void lockInterruptibly() throws InterruptedException;这个方法加锁阻塞会被Thread.interrupt打断,并且抛出InterruptedException异常,它是可中断的锁。
newCondition()
Condition提供了await/signal机制,正如同Object.wait/signal必须拥有对象锁(synchronized)一样,Condition是与Lock一起使用的,lock.newCondition()就是生成与当前锁关联的Condition对象,调用condition.await()方法前当前线程必须拥有lock锁。
我们来看看Condition在生产者和消费者问题上的使用,通过notFull发池子不满的信号和notEmpty来发送池子不为空的信号,JDK提供的ArrayBlockingQueue已经实现了这个功能:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
AQS:AbstractQueuedSynchronizer
Lock和Condition作为接口提供了一个很好的设计,如何实现这些接口是有难度的。
AQS编程
AbstractQueuedSynchronizer是一个实现锁和同步控制的框架,AQS是一个抽象类,实现了所有的线程阻塞和排队机制,这些实现依赖一个整型变量state,修改这个变量的值代表了同步时的获取锁和释放锁。AQS提供了protected方法供子类实现,子类通过 getState(), setState(int) 和 compareAndSetState(int, int)操作state变量的值来实现同步,我们来看这些protected方法:
/**
* The synchronization state.
*/
private volatile int state;
// 排它锁模下尝试通过state变量获取许可
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 排它锁模式下通过设置变量state的值来反映释放许可
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享锁模下尝试通过state变量获取许可,返回值-1表获取锁失败
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享锁模式下通过设置变量state的值来反映释放许可
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
AQS并没有实现Lock接口,它通常作为一个Lock实现类的辅助抽象类,通过AQS的实现来实现Lock接口的功能。我们来看JDK提供的一个利用AQS实现一个非重入锁的示例:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 如果state值为0,则获得锁
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,设置state值为0
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() { return new ConditionObject(); }
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
Mutex是一个非重入锁,要求state的值是0时表示可以获取锁,是1时表示锁被占有。Sync是一个实现了AQS的内部类,tryAcquire实现了从0到1的过程,tryRelease实现了从1到0的过程。Mutex实现了Lock接口,所以功能的实现都是通过Sync这个内部类对象来实现。
AbstractQueuedSynchronizer.ConditionObject是一个AQS类中实现了Condition接口的内部类,下文中会介绍更多利用AQS实现的。
AQS实现
我们已经知道,AQS通过state来代表同步状态,接下来我们将简要介绍AQS源码是如何实排队和阻塞机制,如果不关心细节,可以忽略这一节。

通过类图,我们可以得出很重要的一个编码原则:如果不希望一个方法被修改,我们应该用final来修饰。 AQS里面除了几个可以重写的方法外,其余都是final的。
我们来看看acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果尝试获取许可失败,则会通过addWaiter将当前线程加到FIFO等待队列的队尾:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其中if语句是为了尝试快速入队,如果入队失败,enq方法会通过loop循环方式接着尝试入队,我们再来看看acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果是队头元素,则会再次尝试tryAcquire获得许可,获取失败就会进行判断是否可以进入阻塞状态。shouldParkAfterFailedAcquire判断是否可以阻塞,parkAndCheckInterrupt就是具体阻塞方法,通过LockSuupport实现。
我们再来看看release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
释放许可会在队首取出Node节点唤醒,看似是一个公平锁的实现,其实未必,因为会在唤醒后锁被其它线程抢占。
关于AQS还有很多细节,包括waitStatus和ConditionObject,具体源码不往下看了,我们注意到这个队列的节点是Node,它是一个双端双向队列,被称为CLH队列。
ReentrantLock
ReentrantLock是基于AQS实现的一个重入锁和排它锁,同时他还实现了公平锁和非公平锁。我们来看看其中的内部抽象类Sync:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// 略
}
每次重入都会累加state的值,Sync实现了tryRelease(releases)方法,释放相应数目的许可,当前线程不再独占锁时,通过方法setExclusiveOwnerThread设置独占线程为空,Sync还提供了一个抽象方法lock和包内方法nonfairTryAcquire供子类使用,我们来看看具体的公平锁和非公平锁的实现:
static final class NonfairSync extends Sync {
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
NonfairSync实现的是非公平锁,tryAcquire方法调用的是nonfairTryAcquire方法,如果state未被线程占用,则当前线程占用锁,如果是被当前线程重入占用,则累加state值。基本的lock方法实现是调用acquire(1)方法,正如公平锁FairSync的lock方法实现那样,但是NonfairSync尝试调用nonfairTryAcquire之前,通过一个CAS操作希望当前线程抢先占用锁,即这一行代码:
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
FairSync就是一个公平的实现,当有许可可用,state为0时,许多线程尝试获取许可的时候,它会通过hasQueuedPredecessors()来判断是否有比当前线程等待更久的线程,如果没有,则尝试执行当前线程,hasQueuedPredecessors源码如下,当前线程和队首线程不一致则不允许抢占锁:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
接下来ReentrantLock的所有Lock接口方法的实现就可以依靠这些辅助同步类了:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public final boolean isFair() {
return sync instanceof FairSync;
}
// 略
}
通过源码看到,tryLock都会在非公平模式下获取锁。
ReadWriteLock 和 ReentrantReadWriteLock
ReadWriteLock拥有两个锁,一个读锁,一个写锁,如果没有写锁,读锁可以被多个线程持有,写锁是一个排它锁。
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
我们来看看JDK提供的一个使用示例:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
我们先占有读锁去读缓存,如果无效,我们会释放读锁,占用写锁(写锁是独占锁,必须先释放读锁),操作完写之后,释放写锁,然后重新占有读锁,操作缓存数据,最后释放读锁。
ReentrantReadWriteLock是一个重入的读写锁,内部类Sync是实现了AQS的抽象类,state整型变量只能存储一个同步状态,读写锁需要两个同步状态,这里就用到了位操作,state的高16位无符号数用来记录读锁的计数,低16位无符号数用来记录写锁的计数:
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
我们直接看看其中一个获取读锁的方法:
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
获取读锁前,总是会判断排它锁的计数是否为0。ReentrantReadWriteLock是通过内部维护的两个Lock对象来实现读写锁,它们都是通过Sync实现了具体功能,深入学习的可以看源码。
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
StampedLock
JDK1.8以后还提供了 StampedLock 来支持读的场景很频繁,写的场景比较少的一种读写锁优化。
总结
本文我们学习了更加强大的Lock锁,在实现自己的锁之前,需要深入研究AQS的机制,使用好优化的锁可以增加我们程序的性能。