public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
ReentrantReadWriteLock默认使用的还是公平锁。
ReadLock的lock
1 2 3
public void lock() { sync.acquireShared(1);//共享锁 }
acquireShared调用的是AQS的acquireShared实现:
1 2 3 4 5
//Acquires in shared mode, ignoring interrupts. 共享模式下去获取 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail.//写锁存在,直接失败 * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //映射第一步:1. If write lock held by another thread, fail. if (exclusiveCount(c) != 0 &&//返回低16位记录的数量,写锁的数量不等于0 getExclusiveOwnerThread() != current) // 排它锁的持有者不是当前线程,直接失败 return -1; int r = sharedCount(c);//程序走到这,意味着没有写锁,读取高16位 if (!readerShouldBlock() && // 判断阻塞队列前边是否有线程,公平锁和非公平锁的实现会不一样 r < MAX_COUNT && //不管是读锁还是写锁,锁的数量是有限制的。 compareAndSetState(c, c + SHARED_UNIT)) {//CAS操作,增加数量 if (r == 0) {//之前没有读锁 firstReader = current;//当前线程是第一个获取读锁的线程 firstReaderHoldCount = 1;//读锁的数量 } else if (firstReader == current) { //当前线程就是firstReader firstReaderHoldCount++; //持有数量+1 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //CAS的补救措施的一些操作 return fullTryAcquireShared(current); }
WriteLock的lock
WriteLock的lock的lock实现:
1 2 3
public void lock() { sync.acquire(1); }
AQS的实现:
1 2 3 4 5 6
//Acquires in exclusive mode, ignoring interrupts. 排他模式下使用 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 读的线程数量非0,写的线程数量非0,直接失败,因为写锁是互斥的 * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c);//写锁的数量 if (c != 0) {//排它锁被线程持有 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread())//持有线程是其他线程,不是自己 return false; //直接返回false if (w + exclusiveCount(acquires) > MAX_COUNT) //写锁数量超过锁的数量最大值 throw new Error("Maximum lock count exceeded"); //直接异常 // Reentrant acquire setState(c + acquires); // 可重入的获取(已经有线程持有锁,并且持有锁的线程就是自己,那么就是重入性的获取锁) return true; } // if (writerShouldBlock() || // 是否需要阻塞(排队),公平锁和非公平锁的实现是不一样的 !compareAndSetState(c, c + acquires))//CAS失败,意味着存在一个竞争的获取写锁的线程 return false; setExclusiveOwnerThread(current);//获取排它锁成功,设置锁的持有者是当前线程 return true; }
//Attempts to release this lock. 尝试释放锁 //If the number of readers is now zero then the lock is made available for write lock attempts // 读锁的数量是0,那么写锁操作可以尝试去获取锁 public void unlock() { sync.releaseShared(1); }
releaseShared的实现在AQS里边
1 2 3 4 5 6 7 8 9
// Releases in shared mode. 共享模式下释放锁 public final boolean releaseShared(int arg) { //读锁和写锁都被释放掉 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {// 当前线程是第一个获取读锁的线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//firstReaderHoldCount是第一个获取读锁线程重入的次数 firstReader = null;//只有一个读线程(自己本身),firstReader置空 else firstReaderHoldCount--;//重入的情况,减一 } else { //非第一个读取者 HoldCounter rh = cachedHoldCounter;//一个HoldCounter代表一个读线程的实例,记录了读线程持有锁的数量 if (rh == null || rh.tid != getThreadId(current))//如果当前线程不是最后一个获取读锁的线程 rh = readHolds.get();//从ThreadLocal里边获取当前线程持有锁的数量计数器 int count = rh.count;//得到当前线程持有锁的数量 if (count <= 1) { readHolds.remove();//从ThreadLocal里边删除 if (count <= 0)//二次判断 throw unmatchedUnlockException(); } --rh.count;//当前线程持有锁的数量减一 } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc))//CAS替换 // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //对于读锁时无关紧要的 //状态为0的时候,让写和读有一个条件获取到写锁或者读锁,这里状态为0,会存在读锁和写锁的一块竞争 return nextc == 0; } }
//Release action for shared mode 共享模式下释放锁的操作 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) { Node h = head; if (h != null && h != tail) {//队列里边是有元素的 int ws = h.waitStatus;//等待状态 if (ws == Node.SIGNAL) {//线程正在等待得到通知 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);//唤醒当前节点的下一个节点的线程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } //Wakes up node's successor, if one exists. //如果节点存在,那么将其唤醒 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //当前节点的后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒线程 LockSupport.unpark(s.thread); }
//Attempts to release this lock. //尝试释放锁 public void unlock() { sync.release(1); }
sync.release(1)的实现在AQS里边:
1 2 3 4 5 6 7 8 9 10 11
//Releases in exclusive mode. // 在排他模式下释放锁 public final boolean release(int arg) { if (tryRelease(arg)) {//释放锁成功 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒后继节点 return true; } return false; }
tryRelease的实现在ReentrantReadWriteLock里边:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
protected final boolean tryRelease(int releases) { //当前排它锁持有者的线程是不是当前线程,不是直接异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases;//释放锁,状态减一 boolean free = exclusiveCount(nextc) == 0;//exclusiveCount得到排它锁的数量,如果等于,意味着没有任何线程持有排它锁 if (free) setExclusiveOwnerThread(null);//将持有排他锁的线程清空 setState(nextc);//保存状态 return free; }
protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; //中断的一些判断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //构造一个等待节点加入到等待队列 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //包装了一个线程 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ 将等待时间最长的线程移出去,放到阻塞队列 public final void signal() { //是不是排它锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //获取到阻塞队列的第一个等待者,如果不是空的,证明这个阻塞队列头部有一个node if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ 将一个node从等待队列当中移动到阻塞队列当中 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 具体的节点的处理 /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }