# 八、控制并发流程

image-20210323105217399

# 1. CountDownLatch 倒计时门闩

# 1.1 作用

  • 倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作。

例如:

  • 购物拼团
  • 人满发车

# 1.2 主要方法

  • public CountDownLatch(int count)

    /** 唯一的构造器,count 是需要倒数的数 */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
  • public void await():线程会被挂起,它会等到 count = 0 才继续执行

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
  • void countDown():将 count 值减 1,直到 0 时,等待的线程会被唤醒。

    public void countDown() {
        sync.releaseShared(1);
    }
    
image-20210323110121092

# 1.3 两种用法

  1. 一个线程等待多个线程执行完毕,再继续自己的工作。

    /**
     * 演示:工产中的质检,5个工人检查,所有人都认为通过才通过
     *
     * @author Hedon Wang
     * @create 2021-03-23 10:56 AM
     */
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(5);
            ExecutorService service = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; i++) {
                int number = i+1;
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(new Random().nextInt(1000) + 1000);
                            System.out.println(number + "号工人完成质检");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                };
                service.submit(runnable);
            }
            System.out.println("等待5个人检查完成....");
            //等待count=0
            countDownLatch.await();
            System.out.println("5个人都已经质检完毕!!!");
    
        }
    }
    
  2. 多个线程等待某一个线程的信号,同时开始执行。

    • 竞赛中多位运动员等待裁判鸣枪后同时出发。
    /**
     * 演示:5名运动员等待裁判一声令下,所有人同时开始跑步
     *
     * @author Hedon Wang
     * @create 2021-03-23 10:56 AM
     */
    public class CountDownLatchDemo2 {
        public static void main(String[] args) throws InterruptedException {
    
            CountDownLatch latch = new CountDownLatch(1);
    
            ExecutorService service = Executors.newFixedThreadPool(5);
    
            for (int i = 0; i < 5; i++) {
                int number = 1 + i;
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(number + "号运动员准备完毕,等待发令枪..");
                        try {
                            latch.await();
                            System.out.println(number + "号运动员开始跑步");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                service.submit(runnable);
            }
            Thread.sleep(5000);
            System.out.println("发令枪响,比赛开始!!");
          	//下令
            latch.countDown();
        }
    }
    

# 1.4 注意点

  • CountDownLatch 不能重用。如果想重用,可以新建一个 CountDownLatch 或者用 CyclicBarrier

# 2. Semaphore 信号量

# 2.1 作用

  • Semaphore 用来限制或管理数量有限的资源的使用情况。
  • 信号量的作用是维护一个“许可证”的计数,线程可以“获取”许可证,那信号量剩余的许可证就 -1。当信号量锁拥有的许可证数量为 0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

# 2.2 使用流程

  1. 初始化 Semaphore 并指定许可证数量
  2. 在需要许可证的代码前加 acquire() 或者 acquireUninterruptibly() 方法。
  3. 当任务执行完成后,要调用 release() 来释放许可证。

# 2.3 主要方法

  • public Semaphore(int permits, boolean fair)

    • 参数1:许可证数量
    • 参数2:是否公平
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
  • acquire():请求许可证(可以指定数量),可以响应中断。

  • acquireUninterruptibly():请求许可证,不能响应中断。

  • tryAcquire():尝试获取许可证,失败则干别的事。

  • release():释放许可证(可以指定数量)。

# 2.4 示例

/**
 * 演示:50个线程执行100个任务,但是用信号量来限制每次只能3个任务同时进行。
 *
 * @author Hedon Wang
 * @create 2021-03-23 11:24 AM
 */
public class SemaphoreDemo1 {

    //许可证中心
    static Semaphore semaphore = new Semaphore(3,true);

    public static void main(String[] args) {

        //50个线程的线程池
        ExecutorService service = Executors.newFixedThreadPool(50);
        //100个任务
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        //关闭线程池
        service.shutdown();
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "想要拿许可证....");
                //真正限制并发量的是 semaphore 的限制而不是线程池的数量
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证!");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证~~~");
            semaphore.release();
        }
    }
}

# 2.5 注意点

  1. 获取和释放的数量最好是一致的。
  2. 一般设置公平。
  3. 可以由线程 A 获取许可证,然后由线程 B 释放许可证。
  4. 可以用只有一个许可证的 Semaphore 来实现一个轻量级 CountDownLatch。

# 3. Condition 条件对象

# 3.1 常用方法

  • await():让线程进入阻塞状态。
  • signal():唤醒被该 condition 阻塞的等待时间最长的一个线程,将其线程状态改为 RUNNABLE。
  • signalAll():唤醒所有正在等待的线程。

# 3.2 示例

Condition 是绑定在 Lock 上面的。

public class ConditionDemo {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "条件不满足,开始 await");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "条件已满足,开始执行后续任务..");
        }finally {
            lock.unlock();
        }
    }

    void method2(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "准备工作完成,唤醒其他线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

       ConditionDemo conditionDemo = new ConditionDemo();
       new Thread(new Runnable() {
           @Override
           public void run() {
               conditionDemo.method1();
           }
       }).start();

       new Thread(new Runnable() {
           @Override
           public void run() {
               conditionDemo.method2();
           }
       }).start();
    }
}

# 3.3 生产者消费者模式

/**
 * 使用 Condition 实现生产者消费者模式
 *
 * @author Hedon Wang
 * @create 2021-03-23 3:23 PM
 */
public class ProducerAndConsumerWithCondition {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    /**
     * 消费者
     */
    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        /**
         * 消费
         */
        private void consume(){
            while (true){
                lock.lock();
                try {
                    //没东西了
                    while (queue.size() == 0){
                        try {
                            //消费阻塞
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer poll = queue.poll();
                    //消费了必然不满
                    notFull.signalAll();
                    System.out.println("消费者正在消费产品 " + poll);
                }finally {
                    lock.unlock();
                }
            }

        }
    }

    /**
     * 生产者
     */
    class Producer extends Thread{
        @Override
        public void run() {
            produce();
        }

        /**
         * 生产
         */
        public void produce(){
            while (true){
                lock.lock();
                try {
                    //满了
                    while (queue.size() == queueSize){
                        System.out.println("队列满了,等待消费~~~");
                        //生产阻塞
                        try {
                            System.out.println("队列满了,生产者暂停生产");
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    int i = new Random().nextInt(100);
                    queue.offer(i);
                    notEmpty.signalAll();
                    System.out.println("生产者生产出产品 " + i);
                }finally {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        ProducerAndConsumerWithCondition pC = new ProducerAndConsumerWithCondition();
        Producer producer = pC.new Producer();
        Consumer consumer = pC.new Consumer();
        producer.start();
        consumer.start();
    }
}

# 3.4 注意点

  1. Condition 和 Object 的 wait()、notify() 用法和性质上基本一致。
  2. await() 方法回自动释放已有的 Lock 锁。
  3. 调用 await() 之前必须先持有锁,否则会抛异常。

# 3.5 优势

Object 的 wait() 和 notify() 持有的锁必须是 synchronized 的那个锁。

而一个 Lock 可以生成多个 Condition,针对不同的竞争条件,更加灵活。

# 4. CyclicBarrier 循环栅栏

# 4.1 作用

  • CyclicBarrier 和 CountDownLatch 很类似,都能阻塞一组线程。
  • 当有大量线程互相配合,分别计算不同人物,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier。
  • CyclicBarrier 可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就iu撤销,所有线程再统一出发,继续执行剩下的任务。

# 4.2 常用方法

  • public CyclicBarrier(int parties, Runnable barrierAction)

    • 参数1:需要等待的线程数
    • 参数2:所有线程集合后要执行的事情。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
  • await():一个线程到达集合点后 await(),等待其他线程到达集合点。

# 4.3 示例

/**
 * 演示 CyclicBarrier 的用法
 * 
 * @author Hedon Wang
 * @create 2021-03-23 4:32 PM
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                //当所有人都到了,要做什么事
                System.out.println("所有人都到场了,大家统一出发");
            }
        });

        for (int i = 0; i < 5; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }

    }

    static class Task implements Runnable{
        private int id;
        private CyclicBarrier cyclicBarrier;
        public Task(int id, CyclicBarrier cyclicBarrier){
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程 " + id + "现在前往集合地点..");
            try {
                Thread.sleep(new Random().nextInt(3000));
                System.out.println("线程 " + id + "已经到达集合地点!!");
                cyclicBarrier.await();
                System.out.println("线程 " + id + "已经等到所有人都到场了,再继续做自己的事情~~");
            }catch (InterruptedException e){
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:

线程 0现在前往集合地点..
线程 3现在前往集合地点..
线程 2现在前往集合地点..
线程 1现在前往集合地点..
线程 4现在前往集合地点..
线程 2已经到达集合地点!!
线程 3已经到达集合地点!!
线程 4已经到达集合地点!!
线程 1已经到达集合地点!!
线程 0已经到达集合地点!!
所有人都到场了,大家统一出发
线程 0已经等到所有人都到场了,再继续做自己的事情~~
线程 3已经等到所有人都到场了,再继续做自己的事情~~
线程 4已经等到所有人都到场了,再继续做自己的事情~~
线程 1已经等到所有人都到场了,再继续做自己的事情~~
线程 2已经等到所有人都到场了,再继续做自己的事情~~

# 4.4 注意点

  1. CyclickBarrier 是可以重用的,CountDownLatch 不可重用。
  2. CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待 count = 0,也就是说,CountDownLatch 用于事件(一个线程可以多次 countDown()),但是 CyclicBarrier 用于线程(线程数必须要够了)。
上次更新: 9/17/2021, 12:28:06 PM