# 九、AQS

# 1. AQS 地位

事实上,不仅是 ReentrantLock 和 Semaphore,包括 CountDownLatch、ReentrantReadWriteLock 底层都用了一个共同的基类 AbstractQueuedSynchronizer,也就是 AQS。

image-20210324153807263

因为上面的那些协作类它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接使用。对于 ReentrantLock 和 Semaphore 而言就可以屏蔽很多细节,只关注它们自己的“业务逻辑”就可以了。

如果没有 AQS,就需要每个协作工具自己实现:

  1. 同步状态的原子性管理
  2. 线程的阻塞与解除阻塞
  3. 队列的管理

然而这些东西都是相同的,所以提取出一个 AQS 后要实现协作工具会简单很多。

# 2. AQS 三要素

# 2.1 state 状态

private volatile int state;

这里的 state 的含义会根据具体实现类的不同而不同,比如:

  • Semaphore 中,state 表示“剩余的许可证的数量”,当 state 为 0 的时候,说明许可证可以用完了。
  • CountDownLatch 中,state 表示**“还需要倒数的数量”**,当 state(count) 为 0 的时候,表示倒计时结束了,线程继续执行。
  • ReentrantLock 中,state 表示**“锁的占有情况,包括可重入计数”**,当 state 值为 0 的时候,标识着该 Lock 不被任何线程占有。

state 是 volatile 修饰的,会被并发的修改。所以所有修改 state 的方法都需要保证线程安全。

比如 getState()、setState() 以及 compareAndSetState() 操作来读取和更新这个状态,都需要保证线程安全,这些方法也都依赖于 j.u.c.atomic 包的支持。

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

# 2.2 控制线程抢锁和配合的 FIFO 队列

这个队列用来存放“等待的线程”,AQS 就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。

image-20210324154809853

# 2.3 期望协作工具类去实现的获取/释放等重要方法

# 2.3.1 获取方法

获取操作会依赖 state 变量,经常会阻塞(比如获取不到锁的时候)。

  • 在 Sempahore 中,获取就是 acquire() 方法,作用就是获取一个许可证
  • 在 CountDownLatch 中,获取就是 await() 方法,作用是等待,直到倒数结束

# 2.3.2 释放方法

释放操作不会阻塞。

  • 在 Semaphore 中,释放就是 release() 方法,作用是释放一个许可证
  • 在 CountDownLatch 中,释放就是 countDown() 方法,作用是倒数一个数

# 3. CountDownLatch 和 AQS 的关系

内部有一个 Sync 类继承 AQS:

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

# 3.1 构造函数

  • count 就是 AQS 里面的 state
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
  	setState(count);
}

# 3.2 getCount()

  • 直接就是返回 AQS 的 state
public long getCount() {
    return sync.getCount();
}

int getCount() {
    return getState();
}

# 3.3 countDown()

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
  	//判断是否需要将线程从 FIFO 队列中释放(count-- 后等于 0 的时候需要) 
  	if (tryReleaseShared(arg)) {
      	//返回 true 的话就做 doReleaseShared() 将线程从 FIFO 队列中释放
    		doReleaseShared();
    		return true;
  	}
  	return false;
}

tryReleaseShared(arg) 在 CountDownLatch 中实现的 Sync 类中有实现:

public class CountDownLatch {
  	//....
  
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
      
        protected boolean tryReleaseShared(int releases) {
            // 死循环一般都是为了自旋来做 CAS
            for (;;) {
              	//获取 count
                int c = getState();
              	//如果是 0,说明之前已经被释放过了,不需要 return true 去做 doReleaseShared()。
                if (c == 0)
                    return false;
              	//不是0,那就做 count--
                int nextc = c-1;
              	//进行 CAS,并返回当前 count 是否等于 0,等于 0 说明需要做 doReleaseShared() 释放线程
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
  
  	//....
}

如果 tryReleaseShared(args) 返回了 true,也是就当前 count = 0,倒计时结束了,那么还有调用 doReleaseShared() 将被 CountdownLatch 阻塞的线程从 FIFO 队列中释放:

private void doReleaseShared() {
  	//自旋
    for (;;) {
        Node h = head;
      	//循环释放所有被该 CountDownLatch 共享锁阻塞的线程
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

# 3.4 await()

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
  	//如果小于0,那就入队
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
  	//如果不小于0,那就说明当前 CountDownLatch 正常获取锁了。
}


tryAcquireShared(arg) 在 CountDownLatch 中实现的 Sync 类中有实现:

public class CountDownLatch {
  	//.....
  	
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
      
				protected int tryAcquireShared(int acquires) {
          	//如果 state(也就是count) 是 0,那就返回正数,如果不是0,就返回负数
            return (getState() == 0) ? 1 : -1;
        }
    }
  
  
  	//.....
}

所以这也就符合了 CountDownLatch 的功能了,线程 await() 到 count = 0 的时候就可以获得锁继续执行了。

CountDownLatch 中阻塞线程让其入队的过程 doAcquireSharedInterruptibly() 分析如下:

		/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
      	//1. 将当前线程包装成一个 Node 结点(FIFO 队列就是用 Node 结点组成的)。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
              	//2. 将 Node 插入 FIFO 队列中
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //3. 然后将线程陷入阻塞状态
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

# 4. Semaphore 和 AQS 的关系

  • Semaphore 中有一个 Sync 类,Sync 继承了 AQS,而 AQS 中的 state 在 Semaphore 的意义就是剩余的许可证数量

    public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        private final Sync sync;
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    

# 4.1 构造函数

  • permits(许可证)就是 AQS 中的 state:
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
 * Fair version 公平版本(非公平版本同理)
 */
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
		//直接调用 setState
    Sync(int permits) {
        setState(permits);
    }
}

# 4.2 acquire()

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

调用 acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
  	//1. 判断线程是否已经被中断了,如果中断了就抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
  	//2. 尝试获取锁,返回结果如果是负数的话,就将该线程放入 FIFO 阻塞队列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
  			//如果尝试结果是整数的话,那么说明该线程获取锁成功。
}

tryAcquireShared(arg) 方法在 Semaphore 中的 FairSync 和 NonFairSync 中均有实现,这里只看公平版本:

/**
 * Fair version
 */
static final class FairSync extends Sync {

    protected int tryAcquireShared(int acquires) {
      	//死循环 -> 自旋 -> 做 CAS
        for (;;) {
          	//1. 判断当前线程是不是有队列前置
            //(非公平版本没有这一步)
            if (hasQueuedPredecessors())
                return -1;
          	//2. 获取当前许可证数量
            int available = getState();
          	//3. 减去当前线程申请是许可证数量
            int remaining = available - acquires;
          	//4. 如果许可证小于0 说明不够,直接返回负数,阻塞线程;如果如果许可证大于等于0则做 CAS 后返回当前的许可证数量
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

# 5. AQS 和 ReentrantLock 的关系

  • 在 ReentrantLock 中, AQS 的 state 就是当前锁被线程重入了多少次,0 表示当前锁没有被任何线程持有。

# 5.1 构造函数

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

# 5.2 unlock()

public void unlock() {
  	//其实就是执行 state--
    sync.release(1);
}

release():

public final boolean release(int arg) {
  	//1. 尝试去做 state--,并返回锁是否被释放
    if (tryRelease(arg)) {
        Node h = head;
      	//3. 通知其他线程锁已经被释放了
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
  	//2. 如果锁没有被释放则返回 false
    return false;
}

tryRelease() 在 ReentrantLock 中的 Sync 类有实现:

protected final boolean tryRelease(int releases) {
  	//1. 进行 state--
    int c = getState() - releases;
  	//2. 判断当前线程是否持有锁,没有锁的话就直接抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
  	//3. state = 0 说明该锁已被释放
    if (c == 0) {
        free = true;
      	//5. 将锁的持有者置为null
        setExclusiveOwnerThread(null);
    }
  	//6. 更新 state
    setState(c);
  	//7. 返回当前锁是否已经被释放
    return free;
}

# 5.3 lock()

public void lock() {
    sync.lock();
}

sync.lock(): 以非公平为例

static final class NonfairSync extends Sync {
    final void lock() {
      	//1. 进行 CAS,只有当前 state 是 0,也就是没有人持有该锁的时候,才去设置 state 为 1 表示持有该锁
        if (compareAndSetState(0, 1))
          	//2. 将当前线程设置为持有该锁的线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
          	//2. CAS 失败,表示锁已经被占有了
            acquire(1);
    }
}

sync.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      	//① !tryAcquire(arg) 尝试获取锁失败
        //② acquireQueued(addWaiter(Node.EXCLUSIVE), arg)  竞争锁失败
      	//中断锁
        selfInterrupt();
}

先去 tryAcquire(arg) 尝试获取锁:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
  	//1. 获取当前线程
    final Thread current = Thread.currentThread();
  	//2. 获取当前 state
    int c = getState();
  	//3. 如果 state = 0,那就占有该锁。
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
  	//4. 如果当前线程本身就持有锁了
    else if (current == getExclusiveOwnerThread()) {
      	//5. 那就可重入,state + 1
        int nextc = c + acquires;
      	//特殊情况:锁可重入次数超过了 int 的范围,整形溢出,直接忽略这种奇葩情况
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
      	//6. 更新 state,也就是技术锁的重入次数
        setState(nextc);
      	//7. 返回 true
        return true;
    }
  	//8. 占不到锁,返回 false
    return false;
}

获取锁失败后,将当前线程封装成 Node 再 acquireQueued(final Node node, int arg) 。然后由于目前是 NonfairSync 非公平锁,那么它会尝试去竞争一下,看看有没有可能获取到锁,如果获取不到的话,再进入阻塞队列。

final boolean acquireQueued(final Node node, int arg) {
  	//竞争失败标志
    boolean failed = true;
    try {
      	//中断标志
        boolean interrupted = false;
      	//1. 循环尝试竞争
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
              	//2. 抢占成功
                failed = false;
              	//目前 interrupt 是 false,表示不中断
                return interrupted;
            }
          	//3. 竞争失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
      	//4. 竞争失败,就放弃获取锁,进入队列
        if (failed)
            cancelAcquire(node);
    }
}

# 6. 使用 AQS 自定义一个简化版 CounDownLatch

# AQS 用法
1. 写一个类,想好协作的逻辑,实现获取/释放方法。
2. 内部写一个 Sync 类继承 AbstractQueuedSynchronizer。
3. 根据是否独占来重写 tryAcquires 和 tryReleaseShared(int releases) 等方法,在之前写的获取/是否方法中调用 AQS 的 acquire/release 或者 shared 方法。

目标:一次倒数,count 从 1-> 0 后所有线程都出发。

/**
 * 描述:自己用 AQS 实现一个简单的线程协作器。
 *      一个简易版 CountDownLatch,当 count 从 1->0 的时候,唤醒所有线程。
 *
 * @author Hedon Wang
 * @create 2021-03-24 4:54 PM
 */
public class OneShotLatch {

    private final Sync sync = new Sync();

    /**
     * 获取
     */
    public void await(){
        //获取锁
        /*
            public final void acquireShared(int arg) {
                if (tryAcquireShared(arg) < 0)
                     doAcquireShared(arg);
            }
         */
        sync.acquireShared(0);
    }

    /**
     * 释放,打开门闩
     */
    public void countDown(){
        //释放锁
        /*
            public final boolean releaseShared(int arg) {
                if (tryReleaseShared(arg)) {
                     doReleaseShared();
                     return true;
                }
                return false;
            }
         */
        sync.releaseShared(0);
    }

    /**
     * 内部类 Sync 继承 AQS
     */
    private final class Sync extends AbstractQueuedSynchronizer{

        /**
         * 尝试获取锁
         *
         * return 正数:获得锁成功
         * return 负数:获得锁失败,阻塞
         */
        @Override
        protected int tryAcquireShared(int arg) {
            /**
             * 如果 state = 1,说明门闩已经被打开了,当前线程直接获得锁
             * 如果 state = 0,说明门闩没有被打开,会让当前线程去排队等待
             */
            return (getState() == 1) ? 1 : -1;
        }

        /**
         * 尝试释放锁
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            //简化版 CountDownLatch,直接将 state 置为 1,说明门闩已经被打开了,所以会唤醒所有被阻塞的线程
            setState(1);
            return true;
        }
    }


    /**
     * 测试
     */
    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
      	
      	//阻塞10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "尝试获取 Latch...");
                    oneShotLatch.await();
                    System.out.println(Thread.currentThread().getName() + "获取 Latch 成功,继续执行");
                }
            }).start();
        }

        Thread.sleep(10000);
        //10秒后开门闩
        System.out.println("================================");
        System.out.println("==========主线程开门闩了==========");
        System.out.println("================================");
        oneShotLatch.countDown();
    }
}

结果:

Thread-0尝试获取 Latch...
Thread-2尝试获取 Latch...
Thread-4尝试获取 Latch...
Thread-3尝试获取 Latch...
Thread-1尝试获取 Latch...
Thread-7尝试获取 Latch...
Thread-6尝试获取 Latch...
Thread-5尝试获取 Latch...
Thread-9尝试获取 Latch...
Thread-8尝试获取 Latch...
================================
==========主线程开门闩了==========
================================
Thread-0获取 Latch 成功,继续执行
Thread-2获取 Latch 成功,继续执行
Thread-4获取 Latch 成功,继续执行
Thread-3获取 Latch 成功,继续执行
Thread-1获取 Latch 成功,继续执行
Thread-6获取 Latch 成功,继续执行
Thread-5获取 Latch 成功,继续执行
Thread-7获取 Latch 成功,继续执行
Thread-9获取 Latch 成功,继续执行
Thread-8获取 Latch 成功,继续执行
上次更新: 11/1/2021, 10:07:49 AM