# 九、AQS
# 1. AQS 地位
事实上,不仅是 ReentrantLock 和 Semaphore,包括 CountDownLatch、ReentrantReadWriteLock 底层都用了一个共同的基类 AbstractQueuedSynchronizer
,也就是 AQS。
因为上面的那些协作类它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接使用。对于 ReentrantLock 和 Semaphore 而言就可以屏蔽很多细节,只关注它们自己的“业务逻辑”就可以了。
如果没有 AQS,就需要每个协作工具自己实现:
- 同步状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
然而这些东西都是相同的,所以提取出一个 AQS 后要实现协作工具会简单很多。
# 2. AQS 三要素
# 2.1 state 状态
private volatile int state;
这里的 state 的含义会根据具体实现类的不同而不同,比如:
- Semaphore 中,state 表示“剩余的许可证的数量”,当 state 为 0 的时候,说明许可证可以用完了。
- CountDownLatch 中,state 表示**“还需要倒数的数量”**,当 state(count) 为 0 的时候,表示倒计时结束了,线程继续执行。
- ReentrantLock 中,state 表示**“锁的占有情况,包括可重入计数”**,当 state 值为 0 的时候,标识着该 Lock 不被任何线程占有。
state 是 volatile 修饰的,会被并发的修改。所以所有修改 state 的方法都需要保证线程安全。
比如 getState()、setState() 以及 compareAndSetState() 操作来读取和更新这个状态,都需要保证线程安全,这些方法也都依赖于 j.u.c.atomic 包的支持。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
# 2.2 控制线程抢锁和配合的 FIFO 队列
这个队列用来存放“等待的线程”,AQS 就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。
# 2.3 期望协作工具类去实现的获取/释放等重要方法
# 2.3.1 获取方法
获取操作会依赖 state
变量,经常会阻塞(比如获取不到锁的时候)。
- 在 Sempahore 中,获取就是
acquire()
方法,作用就是获取一个许可证。 - 在 CountDownLatch 中,获取就是
await()
方法,作用是等待,直到倒数结束。
# 2.3.2 释放方法
释放操作不会阻塞。
- 在 Semaphore 中,释放就是
release()
方法,作用是释放一个许可证。 - 在 CountDownLatch 中,释放就是
countDown()
方法,作用是倒数一个数。
# 3. CountDownLatch 和 AQS 的关系
内部有一个 Sync 类继承 AQS:
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
# 3.1 构造函数
count
就是 AQS 里面的state
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
# 3.2 getCount()
- 直接就是返回 AQS 的 state
public long getCount() {
return sync.getCount();
}
int getCount() {
return getState();
}
# 3.3 countDown()
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//判断是否需要将线程从 FIFO 队列中释放(count-- 后等于 0 的时候需要)
if (tryReleaseShared(arg)) {
//返回 true 的话就做 doReleaseShared() 将线程从 FIFO 队列中释放
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(arg) 在 CountDownLatch 中实现的 Sync 类中有实现:
public class CountDownLatch {
//....
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
protected boolean tryReleaseShared(int releases) {
// 死循环一般都是为了自旋来做 CAS
for (;;) {
//获取 count
int c = getState();
//如果是 0,说明之前已经被释放过了,不需要 return true 去做 doReleaseShared()。
if (c == 0)
return false;
//不是0,那就做 count--
int nextc = c-1;
//进行 CAS,并返回当前 count 是否等于 0,等于 0 说明需要做 doReleaseShared() 释放线程
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
//....
}
如果 tryReleaseShared(args) 返回了 true,也是就当前 count = 0,倒计时结束了,那么还有调用 doReleaseShared()
将被 CountdownLatch 阻塞的线程从 FIFO 队列中释放:
private void doReleaseShared() {
//自旋
for (;;) {
Node h = head;
//循环释放所有被该 CountDownLatch 共享锁阻塞的线程
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
# 3.4 await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//如果小于0,那就入队
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
//如果不小于0,那就说明当前 CountDownLatch 正常获取锁了。
}
tryAcquireShared(arg)
在 CountDownLatch 中实现的 Sync 类中有实现:
public class CountDownLatch {
//.....
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
//如果 state(也就是count) 是 0,那就返回正数,如果不是0,就返回负数
return (getState() == 0) ? 1 : -1;
}
}
//.....
}
所以这也就符合了 CountDownLatch 的功能了,线程 await() 到 count = 0 的时候就可以获得锁继续执行了。
CountDownLatch 中阻塞线程让其入队的过程 doAcquireSharedInterruptibly()
分析如下:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//1. 将当前线程包装成一个 Node 结点(FIFO 队列就是用 Node 结点组成的)。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//2. 将 Node 插入 FIFO 队列中
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
//3. 然后将线程陷入阻塞状态
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
# 4. Semaphore 和 AQS 的关系
Semaphore 中有一个 Sync 类,Sync 继承了 AQS,而 AQS 中的 state 在 Semaphore 的意义就是剩余的许可证数量。
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L;
# 4.1 构造函数
- permits(许可证)就是 AQS 中的 state:
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* Fair version 公平版本(非公平版本同理)
*/
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
//直接调用 setState
Sync(int permits) {
setState(permits);
}
}
# 4.2 acquire()
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
调用 acquireSharedInterruptibly(int arg)
:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//1. 判断线程是否已经被中断了,如果中断了就抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//2. 尝试获取锁,返回结果如果是负数的话,就将该线程放入 FIFO 阻塞队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
//如果尝试结果是整数的话,那么说明该线程获取锁成功。
}
tryAcquireShared(arg) 方法在 Semaphore 中的 FairSync 和 NonFairSync 中均有实现,这里只看公平版本:
/**
* Fair version
*/
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
//死循环 -> 自旋 -> 做 CAS
for (;;) {
//1. 判断当前线程是不是有队列前置
//(非公平版本没有这一步)
if (hasQueuedPredecessors())
return -1;
//2. 获取当前许可证数量
int available = getState();
//3. 减去当前线程申请是许可证数量
int remaining = available - acquires;
//4. 如果许可证小于0 说明不够,直接返回负数,阻塞线程;如果如果许可证大于等于0则做 CAS 后返回当前的许可证数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
# 5. AQS 和 ReentrantLock 的关系
- 在 ReentrantLock 中, AQS 的 state 就是当前锁被线程重入了多少次,0 表示当前锁没有被任何线程持有。
# 5.1 构造函数
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
# 5.2 unlock()
public void unlock() {
//其实就是执行 state--
sync.release(1);
}
release():
public final boolean release(int arg) {
//1. 尝试去做 state--,并返回锁是否被释放
if (tryRelease(arg)) {
Node h = head;
//3. 通知其他线程锁已经被释放了
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
//2. 如果锁没有被释放则返回 false
return false;
}
tryRelease() 在 ReentrantLock 中的 Sync 类有实现:
protected final boolean tryRelease(int releases) {
//1. 进行 state--
int c = getState() - releases;
//2. 判断当前线程是否持有锁,没有锁的话就直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//3. state = 0 说明该锁已被释放
if (c == 0) {
free = true;
//5. 将锁的持有者置为null
setExclusiveOwnerThread(null);
}
//6. 更新 state
setState(c);
//7. 返回当前锁是否已经被释放
return free;
}
# 5.3 lock()
public void lock() {
sync.lock();
}
sync.lock(): 以非公平为例
static final class NonfairSync extends Sync {
final void lock() {
//1. 进行 CAS,只有当前 state 是 0,也就是没有人持有该锁的时候,才去设置 state 为 1 表示持有该锁
if (compareAndSetState(0, 1))
//2. 将当前线程设置为持有该锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//2. CAS 失败,表示锁已经被占有了
acquire(1);
}
}
sync.acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//① !tryAcquire(arg) 尝试获取锁失败
//② acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 竞争锁失败
//中断锁
selfInterrupt();
}
先去 tryAcquire(arg) 尝试获取锁:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//1. 获取当前线程
final Thread current = Thread.currentThread();
//2. 获取当前 state
int c = getState();
//3. 如果 state = 0,那就占有该锁。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//4. 如果当前线程本身就持有锁了
else if (current == getExclusiveOwnerThread()) {
//5. 那就可重入,state + 1
int nextc = c + acquires;
//特殊情况:锁可重入次数超过了 int 的范围,整形溢出,直接忽略这种奇葩情况
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//6. 更新 state,也就是技术锁的重入次数
setState(nextc);
//7. 返回 true
return true;
}
//8. 占不到锁,返回 false
return false;
}
获取锁失败后,将当前线程封装成 Node 再 acquireQueued(final Node node, int arg) 。然后由于目前是 NonfairSync 非公平锁,那么它会尝试去竞争一下,看看有没有可能获取到锁,如果获取不到的话,再进入阻塞队列。
final boolean acquireQueued(final Node node, int arg) {
//竞争失败标志
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
//1. 循环尝试竞争
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
//2. 抢占成功
failed = false;
//目前 interrupt 是 false,表示不中断
return interrupted;
}
//3. 竞争失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//4. 竞争失败,就放弃获取锁,进入队列
if (failed)
cancelAcquire(node);
}
}
# 6. 使用 AQS 自定义一个简化版 CounDownLatch
# AQS 用法
1. 写一个类,想好协作的逻辑,实现获取/释放方法。
2. 内部写一个 Sync 类继承 AbstractQueuedSynchronizer。
3. 根据是否独占来重写 tryAcquires 和 tryReleaseShared(int releases) 等方法,在之前写的获取/是否方法中调用 AQS 的 acquire/release 或者 shared 方法。
目标:一次倒数,count 从 1-> 0 后所有线程都出发。
/**
* 描述:自己用 AQS 实现一个简单的线程协作器。
* 一个简易版 CountDownLatch,当 count 从 1->0 的时候,唤醒所有线程。
*
* @author Hedon Wang
* @create 2021-03-24 4:54 PM
*/
public class OneShotLatch {
private final Sync sync = new Sync();
/**
* 获取
*/
public void await(){
//获取锁
/*
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
*/
sync.acquireShared(0);
}
/**
* 释放,打开门闩
*/
public void countDown(){
//释放锁
/*
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
*/
sync.releaseShared(0);
}
/**
* 内部类 Sync 继承 AQS
*/
private final class Sync extends AbstractQueuedSynchronizer{
/**
* 尝试获取锁
*
* return 正数:获得锁成功
* return 负数:获得锁失败,阻塞
*/
@Override
protected int tryAcquireShared(int arg) {
/**
* 如果 state = 1,说明门闩已经被打开了,当前线程直接获得锁
* 如果 state = 0,说明门闩没有被打开,会让当前线程去排队等待
*/
return (getState() == 1) ? 1 : -1;
}
/**
* 尝试释放锁
*/
@Override
protected boolean tryReleaseShared(int arg) {
//简化版 CountDownLatch,直接将 state 置为 1,说明门闩已经被打开了,所以会唤醒所有被阻塞的线程
setState(1);
return true;
}
}
/**
* 测试
*/
public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
//阻塞10个线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "尝试获取 Latch...");
oneShotLatch.await();
System.out.println(Thread.currentThread().getName() + "获取 Latch 成功,继续执行");
}
}).start();
}
Thread.sleep(10000);
//10秒后开门闩
System.out.println("================================");
System.out.println("==========主线程开门闩了==========");
System.out.println("================================");
oneShotLatch.countDown();
}
}
结果:
Thread-0尝试获取 Latch...
Thread-2尝试获取 Latch...
Thread-4尝试获取 Latch...
Thread-3尝试获取 Latch...
Thread-1尝试获取 Latch...
Thread-7尝试获取 Latch...
Thread-6尝试获取 Latch...
Thread-5尝试获取 Latch...
Thread-9尝试获取 Latch...
Thread-8尝试获取 Latch...
================================
==========主线程开门闩了==========
================================
Thread-0获取 Latch 成功,继续执行
Thread-2获取 Latch 成功,继续执行
Thread-4获取 Latch 成功,继续执行
Thread-3获取 Latch 成功,继续执行
Thread-1获取 Latch 成功,继续执行
Thread-6获取 Latch 成功,继续执行
Thread-5获取 Latch 成功,继续执行
Thread-7获取 Latch 成功,继续执行
Thread-9获取 Latch 成功,继续执行
Thread-8获取 Latch 成功,继续执行