# 八、控制并发流程
# 1. CountDownLatch 倒计时门闩
# 1.1 作用
- 倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作。
例如:
- 购物拼团
- 人满发车
# 1.2 主要方法
public CountDownLatch(int count)
/** 唯一的构造器,count 是需要倒数的数 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
public void await():线程会被挂起,它会等到 count = 0 才继续执行
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
void countDown():将 count 值减 1,直到 0 时,等待的线程会被唤醒。
public void countDown() { sync.releaseShared(1); }
# 1.3 两种用法
一个线程等待多个线程执行完毕,再继续自己的工作。
/** * 演示:工产中的质检,5个工人检查,所有人都认为通过才通过 * * @author Hedon Wang * @create 2021-03-23 10:56 AM */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { int number = i+1; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(1000) + 1000); System.out.println(number + "号工人完成质检"); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }; service.submit(runnable); } System.out.println("等待5个人检查完成...."); //等待count=0 countDownLatch.await(); System.out.println("5个人都已经质检完毕!!!"); } }
多个线程等待某一个线程的信号,同时开始执行。
- 竞赛中多位运动员等待裁判鸣枪后同时出发。
/** * 演示:5名运动员等待裁判一声令下,所有人同时开始跑步 * * @author Hedon Wang * @create 2021-03-23 10:56 AM */ public class CountDownLatchDemo2 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { int number = 1 + i; Runnable runnable = new Runnable() { @Override public void run() { System.out.println(number + "号运动员准备完毕,等待发令枪.."); try { latch.await(); System.out.println(number + "号运动员开始跑步"); } catch (InterruptedException e) { e.printStackTrace(); } } }; service.submit(runnable); } Thread.sleep(5000); System.out.println("发令枪响,比赛开始!!"); //下令 latch.countDown(); } }
# 1.4 注意点
- CountDownLatch 不能重用。如果想重用,可以新建一个 CountDownLatch 或者用
CyclicBarrier
。
# 2. Semaphore 信号量
# 2.1 作用
- Semaphore 用来限制或管理数量有限的资源的使用情况。
- 信号量的作用是维护一个“许可证”的计数,线程可以“获取”许可证,那信号量剩余的许可证就 -1。当信号量锁拥有的许可证数量为 0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证。
# 2.2 使用流程
- 初始化 Semaphore 并指定许可证数量。
- 在需要许可证的代码前加 acquire() 或者 acquireUninterruptibly() 方法。
- 当任务执行完成后,要调用 release() 来释放许可证。
# 2.3 主要方法
public Semaphore(int permits, boolean fair)
- 参数1:许可证数量
- 参数2:是否公平
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
acquire():请求许可证(可以指定数量),可以响应中断。
acquireUninterruptibly():请求许可证,不能响应中断。
tryAcquire():尝试获取许可证,失败则干别的事。
release():释放许可证(可以指定数量)。
# 2.4 示例
/**
* 演示:50个线程执行100个任务,但是用信号量来限制每次只能3个任务同时进行。
*
* @author Hedon Wang
* @create 2021-03-23 11:24 AM
*/
public class SemaphoreDemo1 {
//许可证中心
static Semaphore semaphore = new Semaphore(3,true);
public static void main(String[] args) {
//50个线程的线程池
ExecutorService service = Executors.newFixedThreadPool(50);
//100个任务
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
//关闭线程池
service.shutdown();
}
static class Task implements Runnable{
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "想要拿许可证....");
//真正限制并发量的是 semaphore 的限制而不是线程池的数量
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "释放了许可证~~~");
semaphore.release();
}
}
}
# 2.5 注意点
- 获取和释放的数量最好是一致的。
- 一般设置公平。
- 可以由线程 A 获取许可证,然后由线程 B 释放许可证。
- 可以用只有一个许可证的 Semaphore 来实现一个轻量级 CountDownLatch。
# 3. Condition 条件对象
# 3.1 常用方法
- await():让线程进入阻塞状态。
- signal():唤醒被该 condition 阻塞的等待时间最长的一个线程,将其线程状态改为 RUNNABLE。
- signalAll():唤醒所有正在等待的线程。
# 3.2 示例
Condition 是绑定在 Lock 上面的。
public class ConditionDemo {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void method1(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "条件不满足,开始 await");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "条件已满足,开始执行后续任务..");
}finally {
lock.unlock();
}
}
void method2(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "准备工作完成,唤醒其他线程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionDemo conditionDemo = new ConditionDemo();
new Thread(new Runnable() {
@Override
public void run() {
conditionDemo.method1();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
conditionDemo.method2();
}
}).start();
}
}
# 3.3 生产者消费者模式
/**
* 使用 Condition 实现生产者消费者模式
*
* @author Hedon Wang
* @create 2021-03-23 3:23 PM
*/
public class ProducerAndConsumerWithCondition {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
/**
* 消费者
*/
class Consumer extends Thread{
@Override
public void run() {
consume();
}
/**
* 消费
*/
private void consume(){
while (true){
lock.lock();
try {
//没东西了
while (queue.size() == 0){
try {
//消费阻塞
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer poll = queue.poll();
//消费了必然不满
notFull.signalAll();
System.out.println("消费者正在消费产品 " + poll);
}finally {
lock.unlock();
}
}
}
}
/**
* 生产者
*/
class Producer extends Thread{
@Override
public void run() {
produce();
}
/**
* 生产
*/
public void produce(){
while (true){
lock.lock();
try {
//满了
while (queue.size() == queueSize){
System.out.println("队列满了,等待消费~~~");
//生产阻塞
try {
System.out.println("队列满了,生产者暂停生产");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int i = new Random().nextInt(100);
queue.offer(i);
notEmpty.signalAll();
System.out.println("生产者生产出产品 " + i);
}finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumerWithCondition pC = new ProducerAndConsumerWithCondition();
Producer producer = pC.new Producer();
Consumer consumer = pC.new Consumer();
producer.start();
consumer.start();
}
}
# 3.4 注意点
- Condition 和 Object 的 wait()、notify() 用法和性质上基本一致。
- await() 方法回自动释放已有的 Lock 锁。
- 调用 await() 之前必须先持有锁,否则会抛异常。
# 3.5 优势
Object 的 wait() 和 notify() 持有的锁必须是 synchronized 的那个锁。
而一个 Lock 可以生成多个 Condition,针对不同的竞争条件,更加灵活。
# 4. CyclicBarrier 循环栅栏
# 4.1 作用
- CyclicBarrier 和 CountDownLatch 很类似,都能阻塞一组线程。
- 当有大量线程互相配合,分别计算不同人物,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier。
- CyclicBarrier 可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就iu撤销,所有线程再统一出发,继续执行剩下的任务。
# 4.2 常用方法
public CyclicBarrier(int parties, Runnable barrierAction)
- 参数1:需要等待的线程数
- 参数2:所有线程集合后要执行的事情。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
await():一个线程到达集合点后 await(),等待其他线程到达集合点。
# 4.3 示例
/**
* 演示 CyclicBarrier 的用法
*
* @author Hedon Wang
* @create 2021-03-23 4:32 PM
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
//当所有人都到了,要做什么事
System.out.println("所有人都到场了,大家统一出发");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Task(i,cyclicBarrier)).start();
}
}
static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier){
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程 " + id + "现在前往集合地点..");
try {
Thread.sleep(new Random().nextInt(3000));
System.out.println("线程 " + id + "已经到达集合地点!!");
cyclicBarrier.await();
System.out.println("线程 " + id + "已经等到所有人都到场了,再继续做自己的事情~~");
}catch (InterruptedException e){
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
输出:
线程 0现在前往集合地点..
线程 3现在前往集合地点..
线程 2现在前往集合地点..
线程 1现在前往集合地点..
线程 4现在前往集合地点..
线程 2已经到达集合地点!!
线程 3已经到达集合地点!!
线程 4已经到达集合地点!!
线程 1已经到达集合地点!!
线程 0已经到达集合地点!!
所有人都到场了,大家统一出发
线程 0已经等到所有人都到场了,再继续做自己的事情~~
线程 3已经等到所有人都到场了,再继续做自己的事情~~
线程 4已经等到所有人都到场了,再继续做自己的事情~~
线程 1已经等到所有人都到场了,再继续做自己的事情~~
线程 2已经等到所有人都到场了,再继续做自己的事情~~
# 4.4 注意点
- CyclickBarrier 是可以重用的,CountDownLatch 不可重用。
- CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待 count = 0,也就是说,CountDownLatch 用于事件(一个线程可以多次 countDown()),但是 CyclicBarrier 用于线程(线程数必须要够了)。