# 一、线程池
# 1. 作用
- 如果不使用线程池,每个任务都新开一个线程处理,线程的创建与销毁开销比较大,过多的线程也会占用太多内存。
- 加快响应速度。
- 合理利用 CPU 和内存。
- 便于统一管理线程和任务。
# 2. 属性
构造函数参数名 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maxPoolSize | int | 最多线程数 |
keepAliveTime | long | 保持存活时间 |
workQueue | BlockingQueue | 任务存储队列 |
threadFactory | ThreadFactory | 当线程池需要新的线程的时候,会使用 ThreadFactory 来创建 |
handler | RejectedExecutionHandler | 由于线程池无法接受你所提交的任务的拒绝策略 |
# 2.1 corePoolSize 和 maxPoolSize
corePoolSize
指的是核心线程数:线程在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时再创建新线程去执行任务。- 线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程有一个上限,这就是最大量
maxPoolSize
。
# 添加线程规则
1. 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行任务。
2. 如果线程数大于或等于 corePoolSize 但少于 maxPoolSize 且任务队列还没满,则将任务放队列。
3. 如果队列已满,并且先线程数小于 maxPoolSize,则创建一个新的线程来运行任务。
4. 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
增减线程的特点:
- 通过设置 corePoolSize 和 maxPoolSize 相同,就可以创建固定大小的线程池。
- 线程池希望保持较少的线程数,并且只有在负载比较大的时候才增加它。
- 通过设置 maxPoolSize 为很高的值,例如 Integer.MAX_VLAUE,可以允许线程池容纳任意数量的并发任务。
- 只有在队列填满时才创建多于 corePoolSize 的线程,所以如果使用无界队列(如 LinkedBlockingQueue),那么线程数就不会超过 corePoolSize。
# 2.2 keepAliveTime
- 如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空间时间超过 keepAliveTime,它们就会被终止。
# 2.3 workQueue
有 3 种最常见的队列类型:
- 直接交换:SynchronousQueue
- 队列本身没有容量,只是作为缓冲。
- 无界队列:LinkedBlockingQueue
- 可能造成内存浪费和 OOM。
- 有界队列:ArrayBlockingQueue
# 2.4 ThreadFactory
新的线程是由 ThreadFactory 创建的,默认使用
Executors.defaultThreadFactory()
,创建出来的线程都在一个线程组,拥有同样的NORM_PRIORITY
优先级并且都不是守护线程。/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { //使用原子类,避免线程重名 private static final AtomicInteger poolNumber = new AtomicInteger(1); //默认线程组 private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; //构造函数 DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } //创建新线程 public Thread newThread(Runnable r) { //同样是 new 一个 Thread 对象出来 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); //非守护线程 if (t.isDaemon()) t.setDaemon(false); //优先级为 NORM_PRIORITY if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否为守护线程等。
# 2.5 handler
- 当线程数到达 maxPoolSize 且 workQueue 已经满了的时候,拒绝任务时该怎么做。
# 3. 创建与停止
# 3.1 系统内置的线程池
- newFixedThreadPool:固定线程数量的线程池。
- newSingleThreadExecutor:只有一个线程的线程池。
- newCachedThreadPool:无界线程池,具有自动回收线程的功能。
- newScheduledThreadPool:支持定时和周期执行线程的线程池。
- newWorkStealingPool:具有抢占式操作的线程池,这个线程池不会保证任务的顺序执行。(JDK8 新增,构造方法传入并发线程数)
一般还是自定义线程池比较好,因为默认的线程池可能会 OOM 而且也不完全贴合我们的实际业务需求。
# 3.2 合适的线程数量
- CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1-2 倍左右。
- 耗时 IO 型(读写数据库、文件、网络读写等):最佳线程数一般会大于 CPU 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空间可以衔接上,参考 Brain Goetz 推荐的计算方法:
- 线程数 = CPU 核心数 * (1+平均等待时间/平均工作时间)
# 3.3 停止线程池
- void shutdown():拒绝新任务,执行完已经接受的任务后就停止线程池。
- boolean isShutdown():判断线程池是否执行过 shutdown() 进入停止状态了。
- boolean isTerminated():判断线程池是否执行过 shutdown() 并且线程已经执行完任务了。
- boolean awaitTermination(long timeout, TimeUnit unit):检查传入时间内线程池是否已经终止。
- List<Runnable> shutdownNow():调用 interrupt() 去停止已经开始的任务,然后返回所有还没有执行的任务。
# 4. 拒绝策略
# 4.1 拒绝时机
- 当 Executor 关闭时,提交新任务会被拒绝。
- 当线程和队列都已经饱和的时候,新任务会被拒绝。
# 4.2 四种拒绝策略
- AbortPolicy:抛出异常,表明拒绝任务。
- DiscardPolicy:默默丢弃,不进行通知。
- DiscardOldestPolicy:丢弃最老的任务。
- CallerRunsPolicy:让提交任务的线程自己去执行任务
- 避免任务损失。
- 是一种负反馈,主线程执行任务需要时间,减慢了任务提交速度。
# 5. 钩子方法
可以在线程执行前后做一些东西,比如说日志。
下面这个例子实现一个可暂停的线程池,我们在线程执行前加一个钩子方法,让它在执行线程之前先做一些事情。
/**
* 演示每个线程执行前面放置一些钩子函数
*
* @author Hedon Wang
* @create 2021-03-16 9:40 PM
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock lock = new ReentrantLock();
private Condition unPaused = lock.newCondition();
/**
* 构造方法
*
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param unit
* @param workQueue
*/
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* 钩子方法① —— 在线程执行之前执行的方法
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try{
while (isPaused){
//休眠线程
unPaused.await();
}
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
* 暂停
*/
public void pause(){
lock.lock();
try {
isPaused = true;
}finally {
lock.unlock();
}
}
/**
* 恢复
*/
public void resume(){
lock.lock();
try {
isPaused = false;
//唤醒所有睡眠的线程
unPaused.signalAll();
}finally {
lock.unlock();
}
}
/**
* main 方法测试
*/
public static void main(String[] args) throws InterruptedException {
//实例化线程池
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
//任务
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建10000个任务
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
//一段时间后暂停线程
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
//再一段时间后恢复线程
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
# 6. 源码
# 6.1 Executor 家族
# Executor
- 一个接口
- 就只有 execute(Runnable command) 这一个方法
public interface Executor {
void execute(Runnable command);
}
# ExecutorService
public interface ExecutorService extends Executor {
/**
* 让线程池进入停止状态:拒绝新任务,执行完已接受任务后停止。
*/
void shutdown();
/**
* 立刻停止线程池并返回等待中的任务。
*/
List<Runnable> shutdownNow();
/**
* 判断线程池是否进入停止状态。
*/
boolean isShutdown();
/**
* 判断线程池是否已经完成停止。
*/
boolean isTerminated();
/**
* 判断一定时间内线程池是否已经完成停止。
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个可以返回值的任务以执行并返回一个 Future 代表任务的未决结果。
*/
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
/**
* 执行给定的任务,当所有任务完成时,返回保存其状态和结果的期货列表。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
# Executors
- 一个工具类
# 6.2 线程池实现线程复用的原理
相同线程执行不同的任务。
runWorker(Worker w):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到这个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//执行完任务,就检测是否有新的任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//同一个线程执行不同的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
# 6.3 线程池状态
- RUNNING:接收新任务并处理排队任务。
- SHUTDOWN:不接受新任务,但处理排队任务。
- STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
- TIDYING:整洁。所有任务都已停止,workerCount 为 0 时,线程会转到 TIDYING 状态,并将允许 terminate() 钩子方法。
- TERMINATED:terminate() 运行完成。
# 6.4 execute()
execute(Runnable command) :
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1. 如果线程数还没达到 corePoolSize,那就增加线程
*/
int c = ctl.get(); //ctl = new AtomicInteger(ctlOf(RUNNING, 0));
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或该池自进入此方法后就关闭了。因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 3. 如果我们无法将任务排队,则尝试添加一个新线程。如果失败,则表明我们已关闭或已饱和,因此拒绝该任务。
*/
else if (!addWorker(command, false))
reject(command);
}
# 7. 注意点
- 避免任务堆积
- 避免线程数过度增加
- 排除线程泄露
# 8. 面试题
# 1. 线程创建和销毁的时机?
① 线程创建:
- 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行任务。
- 如果线程数大于或等于 corePoolSize 但少于 maxPoolSize 且任务队列还没满,则将任务放队列。
- 如果队列已满,并且先线程数小于 maxPoolSize,则创建一个新的线程来运行任务。
- 如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
② 线程销毁:
- 超过 corePoolSize 的那些线程,如果超过了 keepAliveTime 而且没有任务执行的时候就会被回收。
- 专门设置 corePoolSize 里面的线程也可以回收的话,那么它们超过 keepAliveTime 且没有任务执行的时候的也会被回收。