# 一、线程池

# 1. 作用

  1. 如果不使用线程池,每个任务都新开一个线程处理,线程的创建与销毁开销比较大,过多的线程也会占用太多内存。
  2. 加快响应速度。
  3. 合理利用 CPU 和内存。
  4. 便于统一管理线程和任务。

# 2. 属性

构造函数参数名 类型 含义
corePoolSize int 核心线程数
maxPoolSize int 最多线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用 ThreadFactory 来创建
handler RejectedExecutionHandler 由于线程池无法接受你所提交的任务的拒绝策略

# 2.1 corePoolSize 和 maxPoolSize

  • corePoolSize 指的是核心线程数:线程在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时再创建新线程去执行任务。
  • 线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程有一个上限,这就是最大量 maxPoolSize
# 添加线程规则
1. 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行任务。
2. 如果线程数大于或等于 corePoolSize 但少于 maxPoolSize 且任务队列还没满,则将任务放队列。
3. 如果队列已满,并且先线程数小于 maxPoolSize,则创建一个新的线程来运行任务。
4. 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。

增减线程的特点:

  1. 通过设置 corePoolSize 和 maxPoolSize 相同,就可以创建固定大小的线程池。
  2. 线程池希望保持较少的线程数,并且只有在负载比较大的时候才增加它。
  3. 通过设置 maxPoolSize 为很高的值,例如 Integer.MAX_VLAUE,可以允许线程池容纳任意数量的并发任务。
  4. 只有在队列填满时才创建多于 corePoolSize 的线程,所以如果使用无界队列(如 LinkedBlockingQueue),那么线程数就不会超过 corePoolSize。

# 2.2 keepAliveTime

  • 如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空间时间超过 keepAliveTime,它们就会被终止。

# 2.3 workQueue

有 3 种最常见的队列类型:

  1. 直接交换:SynchronousQueue
    • 队列本身没有容量,只是作为缓冲。
  2. 无界队列:LinkedBlockingQueue
    • 可能造成内存浪费和 OOM。
  3. 有界队列:ArrayBlockingQueue

# 2.4 ThreadFactory

  • 新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在一个线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
      	//使用原子类,避免线程重名
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
      	//默认线程组
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    		
      	//构造函数
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    		
      	//创建新线程
        public Thread newThread(Runnable r) {
          	//同样是 new 一个 Thread 对象出来
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
          	//非守护线程
            if (t.isDaemon())
                t.setDaemon(false);
          	//优先级为 NORM_PRIORITY
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
  • 如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否为守护线程等。

# 2.5 handler

  • 当线程数到达 maxPoolSize 且 workQueue 已经满了的时候,拒绝任务时该怎么做。

# 3. 创建与停止

# 3.1 系统内置的线程池

image-20210316211728853

  • newFixedThreadPool:固定线程数量的线程池。
  • newSingleThreadExecutor:只有一个线程的线程池。
  • newCachedThreadPool:无界线程池,具有自动回收线程的功能。
  • newScheduledThreadPool:支持定时和周期执行线程的线程池。
  • newWorkStealingPool:具有抢占式操作的线程池,这个线程池不会保证任务的顺序执行。(JDK8 新增,构造方法传入并发线程数)

一般还是自定义线程池比较好,因为默认的线程池可能会 OOM 而且也不完全贴合我们的实际业务需求。

# 3.2 合适的线程数量

  • CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1-2 倍左右。
  • 耗时 IO 型(读写数据库、文件、网络读写等):最佳线程数一般会大于 CPU 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空间可以衔接上,参考 Brain Goetz 推荐的计算方法:
    • 线程数 = CPU 核心数 * (1+平均等待时间/平均工作时间)

# 3.3 停止线程池

  • void shutdown():拒绝新任务,执行完已经接受的任务后就停止线程池。
  • boolean isShutdown():判断线程池是否执行过 shutdown() 进入停止状态了。
  • boolean isTerminated():判断线程池是否执行过 shutdown() 并且线程已经执行完任务了。
  • boolean awaitTermination(long timeout, TimeUnit unit):检查传入时间内线程池是否已经终止。
  • List<Runnable> shutdownNow():调用 interrupt() 去停止已经开始的任务,然后返回所有还没有执行的任务。

# 4. 拒绝策略

# 4.1 拒绝时机

  1. 当 Executor 关闭时,提交新任务会被拒绝。
  2. 当线程和队列都已经饱和的时候,新任务会被拒绝。

# 4.2 四种拒绝策略

  • AbortPolicy:抛出异常,表明拒绝任务。
  • DiscardPolicy:默默丢弃,不进行通知。
  • DiscardOldestPolicy:丢弃最老的任务。
  • CallerRunsPolicy:让提交任务的线程自己去执行任务
    1. 避免任务损失。
    2. 是一种负反馈,主线程执行任务需要时间,减慢了任务提交速度。

# 5. 钩子方法

可以在线程执行前后做一些东西,比如说日志。

下面这个例子实现一个可暂停的线程池,我们在线程执行前加一个钩子方法,让它在执行线程之前先做一些事情。

/**
 * 演示每个线程执行前面放置一些钩子函数
 *
 * @author Hedon Wang
 * @create 2021-03-16 9:40 PM
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;
    private ReentrantLock lock = new ReentrantLock();
    private Condition unPaused = lock.newCondition();

    /**
     * 构造方法
     *
     * @param corePoolSize
     * @param maximumPoolSize
     * @param keepAliveTime
     * @param unit
     * @param workQueue
     */
    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }


    /**
     * 钩子方法① —— 在线程执行之前执行的方法
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try{
            while (isPaused){
                //休眠线程
                unPaused.await();
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 暂停
     */
    public void pause(){
        lock.lock();
        try {
            isPaused = true;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 恢复
     */
    public void resume(){
        lock.lock();
        try {
            isPaused = false;
            //唤醒所有睡眠的线程
            unPaused.signalAll();
        }finally {
            lock.unlock();
        }
    }

		/**
		 * main 方法测试
		 */
    public static void main(String[] args) throws InterruptedException {
				//实例化线程池
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
				//任务
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
      	//创建10000个任务
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        //一段时间后暂停线程
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
      	//再一段时间后恢复线程
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }
}

image-20210316214854687

# 6. 源码

# 6.1 Executor 家族

image-20210316215436506

# Executor

  • 一个接口
  • 就只有 execute(Runnable command) 这一个方法
public interface Executor {
    void execute(Runnable command);
}
# ExecutorService
public interface ExecutorService extends Executor {

    /**
     * 让线程池进入停止状态:拒绝新任务,执行完已接受任务后停止。
     */
    void shutdown();

    /**
     * 立刻停止线程池并返回等待中的任务。
     */
    List<Runnable> shutdownNow();

    /**
     * 判断线程池是否进入停止状态。
     */
    boolean isShutdown();

    /**
     * 判断线程池是否已经完成停止。
     */
    boolean isTerminated();

    /**
     * 判断一定时间内线程池是否已经完成停止。
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个可以返回值的任务以执行并返回一个 Future 代表任务的未决结果。
     */
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务,当所有任务完成时,返回保存其状态和结果的期货列表。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

# Executors

  • 一个工具类

# 6.2 线程池实现线程复用的原理

相同线程执行不同的任务。

runWorker(Worker w):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
  	//拿到这个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
      	//执行完任务,就检测是否有新的任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                  	//同一个线程执行不同的任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

# 6.3 线程池状态

  • RUNNING:接收新任务并处理排队任务。
  • SHUTDOWN:不接受新任务,但处理排队任务。
  • STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
  • TIDYING:整洁。所有任务都已停止,workerCount 为 0 时,线程会转到 TIDYING 状态,并将允许 terminate() 钩子方法。
  • TERMINATED:terminate() 运行完成。

# 6.4 execute()

execute(Runnable command) :

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 1. 如果线程数还没达到 corePoolSize,那就增加线程
     */
    int c = ctl.get();   //ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    /**
     * 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或该池自进入此方法后就关闭了。因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  	/**
  	 * 3. 如果我们无法将任务排队,则尝试添加一个新线程。如果失败,则表明我们已关闭或已饱和,因此拒绝该任务。
  	 */
    else if (!addWorker(command, false))
        reject(command);
}

# 7. 注意点

  • 避免任务堆积
  • 避免线程数过度增加
  • 排除线程泄露

# 8. 面试题

# 1. 线程创建和销毁的时机?

① 线程创建:

  1. 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行任务。
  2. 如果线程数大于或等于 corePoolSize 但少于 maxPoolSize 且任务队列还没满,则将任务放队列。
  3. 如果队列已满,并且先线程数小于 maxPoolSize,则创建一个新的线程来运行任务。
  4. 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。

image-20210316202321047

② 线程销毁:

  1. 超过 corePoolSize 的那些线程,如果超过了 keepAliveTime 而且没有任务执行的时候就会被回收。
  2. 专门设置 corePoolSize 里面的线程也可以回收的话,那么它们超过 keepAliveTime 且没有任务执行的时候的也会被回收。
上次更新: 10/27/2021, 10:33:23 PM