concurrency(12)可重入读写锁对于AQS的实现分析

ReentrantReadWriteLock

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyTest2 {
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void method(){
try{
lock.readLock().lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("method");
}finally {
lock.readLock().unlock();
}
}

public static void main(String[] args) {
MyTest2 myTest2 = new MyTest2();
IntStream.range(0,10).forEach(i -> new Thread(myTest2::method).start());
}
}

输出:
method
method
method
method
method
method
method
method
method
method

如果将 lock.readLock().lock();换成 lock.writeLock().lock();再去运行,就会看出读锁和写锁的区别,
readLock打印输出结果的速度很快,因为读锁不需要阻塞是共享的,writeLock打印结果的结果很慢,有输出的时间顺序,因为写锁是互斥的,需要挨个执行。

ReentrantReadWriteLock分析

ReentrantReadWriteLock岁state的使用是划分了2部分去使用,state的高16位表示的是读锁,低1位表示的是写锁。
ReentrantReadWriteLock的构造器:

1
2
3
4
5
6
7
8
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

ReentrantReadWriteLock默认使用的还是公平锁。

ReadLock的lock
1
2
3
public void lock() {
sync.acquireShared(1);//共享锁
}

acquireShared调用的是AQS的acquireShared实现:

1
2
3
4
5
//Acquires in shared mode, ignoring interrupts.  共享模式下去获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

tryAcquireShared的实现是子类(Sync)的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.//写锁存在,直接失败
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
*
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//映射第一步:1. If write lock held by another thread, fail.
if (exclusiveCount(c) != 0 &&//返回低16位记录的数量,写锁的数量不等于0
getExclusiveOwnerThread() != current) // 排它锁的持有者不是当前线程,直接失败
return -1;
int r = sharedCount(c);//程序走到这,意味着没有写锁,读取高16位
if (!readerShouldBlock() && // 判断阻塞队列前边是否有线程,公平锁和非公平锁的实现会不一样
r < MAX_COUNT && //不管是读锁还是写锁,锁的数量是有限制的。
compareAndSetState(c, c + SHARED_UNIT)) {//CAS操作,增加数量
if (r == 0) {//之前没有读锁
firstReader = current;//当前线程是第一个获取读锁的线程
firstReaderHoldCount = 1;//读锁的数量
} else if (firstReader == current) { //当前线程就是firstReader
firstReaderHoldCount++; //持有数量+1
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//CAS的补救措施的一些操作
return fullTryAcquireShared(current);
}
WriteLock的lock

WriteLock的lock的lock实现:

1
2
3
public void lock() {
sync.acquire(1);
}

AQS的实现:

1
2
3
4
5
6
//Acquires in exclusive mode, ignoring interrupts.   排他模式下使用
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire在子类ReentrantReadWriteLock.Sync实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 读的线程数量非0,写的线程数量非0,直接失败,因为写锁是互斥的
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);//写锁的数量
if (c != 0) {//排它锁被线程持有
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//持有线程是其他线程,不是自己
return false; //直接返回false
if (w + exclusiveCount(acquires) > MAX_COUNT) //写锁数量超过锁的数量最大值
throw new Error("Maximum lock count exceeded"); //直接异常
// Reentrant acquire
setState(c + acquires); // 可重入的获取(已经有线程持有锁,并且持有锁的线程就是自己,那么就是重入性的获取锁)
return true;
}
//
if (writerShouldBlock() || // 是否需要阻塞(排队),公平锁和非公平锁的实现是不一样的
!compareAndSetState(c, c + acquires))//CAS失败,意味着存在一个竞争的获取写锁的线程
return false;
setExclusiveOwnerThread(current);//获取排它锁成功,设置锁的持有者是当前线程
return true;
}
ReentrantReadWriteLock的lock操作逻辑

读锁:

  1. 在获取读锁时,会尝试判断当前对象是否拥有了写锁,如果已经拥有,则直接失败。
  2. 如果没有写锁,,就表示当前对象没有排它锁,则当前线程会尝试给对象加锁。
  3. 如果当前线程已经持有了该对象的锁,那么直接将读锁数量+1.

写锁:

  1. 在获取写锁时,会尝试判断当前对象是否拥有了锁(读锁和写锁),如果已经拥有并且持有的线程并非当前线程,直接失败。
  2. 如果当前对象没有被加锁,,那么写锁就会为了当前对象上锁,并且将写锁的个数+1.
  3. 将当前对象的排它锁线程持有者设为自己。
ReadLock的unlock

ReentrantReadWriteLock读锁的unlock:

1
2
3
4
5
6
//Attempts to release this lock. 尝试释放锁
//If the number of readers is now zero then the lock is made available for write lock attempts
// 读锁的数量是0,那么写锁操作可以尝试去获取锁
public void unlock() {
sync.releaseShared(1);
}

releaseShared的实现在AQS里边

1
2
3
4
5
6
7
8
9
// Releases in shared mode.   共享模式下释放锁
public final boolean releaseShared(int arg) {
//读锁和写锁都被释放掉
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared的实现在ReentrantReadWriteLock的Sync实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {// 当前线程是第一个获取读锁的线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//firstReaderHoldCount是第一个获取读锁线程重入的次数
firstReader = null;//只有一个读线程(自己本身),firstReader置空
else
firstReaderHoldCount--;//重入的情况,减一
} else {
//非第一个读取者
HoldCounter rh = cachedHoldCounter;//一个HoldCounter代表一个读线程的实例,记录了读线程持有锁的数量
if (rh == null || rh.tid != getThreadId(current))//如果当前线程不是最后一个获取读锁的线程
rh = readHolds.get();//从ThreadLocal里边获取当前线程持有锁的数量计数器
int count = rh.count;//得到当前线程持有锁的数量
if (count <= 1) {
readHolds.remove();//从ThreadLocal里边删除
if (count <= 0)//二次判断
throw unmatchedUnlockException();
}
--rh.count;//当前线程持有锁的数量减一
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))//CAS替换
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//对于读锁时无关紧要的
//状态为0的时候,让写和读有一个条件获取到写锁或者读锁,这里状态为0,会存在读锁和写锁的一块竞争
return nextc == 0;
}
}

如果读锁和写锁都被释放掉,那么就会进入到AQS的doReleaseShared:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//Release action for shared mode 共享模式下释放锁的操作
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/

for (;;) {
Node h = head;
if (h != null && h != tail) {//队列里边是有元素的
int ws = h.waitStatus;//等待状态
if (ws == Node.SIGNAL) {//线程正在等待得到通知
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒当前节点的下一个节点的线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//Wakes up node's successor, if one exists.
//如果节点存在,那么将其唤醒
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//当前节点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}

LockSupport

LockSupport封装了线程的一些等待或者唤醒的操作,中间会调用 sun.misc.Unsafe 的native代码是实现,上一节的锁的释放,唤醒后继节点的动作就是使用了LockSupport.本次我们看一下方法,本次我们看一下unpark的openjdk的底层C++实现。
具体实现代码:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/6f33e450999c/src/os/linux/vm/os_linux.cpp
park方法会调用pthread_mutex_trylock,这个是内核实现。
因此不管是synchronized关键字还是AQS都是需要切换到内核态。

WriteLock的unlock

实现:

1
2
3
4
5
//Attempts to release this lock.
//尝试释放锁
public void unlock() {
sync.release(1);
}

sync.release(1)的实现在AQS里边:

1
2
3
4
5
6
7
8
9
10
11
//Releases in exclusive mode.
// 在排他模式下释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒后继节点
return true;
}
return false;
}

tryRelease的实现在ReentrantReadWriteLock里边:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
//当前排它锁持有者的线程是不是当前线程,不是直接异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//释放锁,状态减一
boolean free = exclusiveCount(nextc) == 0;//exclusiveCount得到排它锁的数量,如果等于,意味着没有任何线程持有排它锁
if (free)
setExclusiveOwnerThread(null);//将持有排他锁的线程清空
setState(nextc);//保存状态
return free;
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

AQS的锁获取直观图

aqs1.png
上面的知识点介绍的是阻塞队列的部分,接下来是条件队列的介绍。

条件队列

ReentrantReadWriteLock里边的读锁和写锁的条件队列都是使用的Condition来实现的。
condition.png

AQS设计哲学解析及synchronized思想变迁

Conditiond.await()实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//中断的一些判断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//构造一个等待节点加入到等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//包装了一个线程
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

condition1.png
当我们newCondition()的时候就会在AQS上创建一个等待队列,当一个线程调用condation.await()的时候就会在等待队列里边创建一个Node节点。

signal方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
将等待时间最长的线程移出去,放到阻塞队列
public final void signal() {
//是不是排它锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//获取到阻塞队列的第一个等待者,如果不是空的,证明这个阻塞队列头部有一个node
if (first != null)
doSignal(first);
}


/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
//
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
将一个node从等待队列当中移动到阻塞队列当中
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// 具体的节点的处理
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

await就是在等待队列中创建一个节点,signal就是将等待队列里边的节点移动到阻塞队列。
condition2.png

AQS和synchronized关键字之间的关系

synchronized:

  1. synchronized关键字在底层的c++实现中,存在2个重要的数据结构(集合),WaitSet、EntryList。
  2. WaitSet中存放的是调用了Object的wait方法的线程对象(被封装成了c++的Node对象)
  3. EntryList中存在是陷入阻塞状态、需要获取monitor的那些线程对象。
  4. 进入到EntryList后,该线程依然需要与其他线程争抢Monitor对象。
  5. 如果争抢到,就表示该线程获取到了对象的锁,它就可以排他方式执行对应的同步代码。

AQS

  1. AQS中存在两种队列,分别是Condition对象的条件队列,以及AQS本身的阻塞队列
  2. 这两个队列中的每一个对象都是Node实例(里面封装了线程对象)
  3. 当位于Condition条件队列中的线程被其他线程signal后,该线程就会从条件队列移动到AQS的阻塞队列中。
  4. 位于AQS阻塞队列中的Node对象本质上都是由一个双向链表来构成的。
  5. 在获取到AQS锁时,这些进入到阻塞队列中的线程会按照在队列中的排序先后尝试获取。
  6. 当AQS阻塞队列中的线程获取到锁后,就表示该线程已经可以正常执行了
  7. 陷入到阻塞状态的线程,依然需要进入到操作系统的内核态,进入阻塞(park方法实现)