详细分析线程池
使用线程池的好处:
- 降低资源消耗:通过重复使用已创建的线程降低线程创建和销毁的消耗
- 提高资源利用率:当任务到达时,可以不需要创建线程就立即执行
- 提高线程的可管理性:线程池可以统一分配,调优和监控
Executor框架
Executor框架是Java5之后引进的,通过Executor来启动线程比使用Thread的start更好,除了方便管理,效率更好(用线程池实现,节约开销)外,还有助于 this逃逸问题。
this逃逸:在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor框架不仅包括线程池的管理,还提供线程工厂、队列以及拒绝策略等。
Executor框架结构
任务(Runnable / Callable)
执行任务需要实现的Runnable接口或Callable接口。这俩接口的实现类均可被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor执行。
任务执行(Executor)
任务执行机制的核心接口Executor,以及ExecutorService接口。ThreadPoolExecutor和ScheduledThreadPoolExecutor类都实现了ExecutorService接口
ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService
ThreadPoolExecutor
//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor
//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
异步计算的结果(Future)
Future接口以及Future接口的实现类FutureTask类都可代表异步计算的结果
当把Runnable接口或Callable皆苦的实现类提交给ThreadPoolExecutor或ScheduledThreadPoolExecutor执行(调用submit()方法会返回一个FutureTask对象)
Executor框架使用
- 主线程首先创建实现Runnable或Callable接口的任务对象
- 把创建完成的实现Runnable或Callable接口的任务对象提交到ExecutorService执行:ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable
task))。 - 如果执行Executor.submit(..),ExecutorService将返回一个Future接口的对象
- 最后,主线程执行FutureTask.get()方法来等待任务执行完成,主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
ThreadPoolExecutor类简单介绍
线程池实现类ThreadPoolExecutor是Executor框架最核心的类
构造方法
ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor饱和策略定义
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
,拒绝新任务的处理ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉ThreadPoolExecutor.DiscardOldestPolicy
:丢弃最早未处理的任务请求
Spring通过ThreadPoolExecutor或者ThreadExecutor构造函数创建线程池时,当不指定饱和策略,默认使用ThreadPoolExecutor.AbortPolicy
,抛出异常拒绝新任务,丢弃这个任务的处理。对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。
推荐使用ThreadPoolExecutor构造函数创建线程池
Executors返回线程池对象的弊端
FixedThreadPool
和SingleThreadPool
:允许请求的队列长度过长,可能堆积大量请求,从而导致OOMCachedThreadPool
和ScheduledThreadPool
:允许创建的线程数量过大,可能创建大量的线程,从而导致OOM
线程池原理分析
execute方法,线程池执行任务
addWorker这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则返回false
常见对比
Runnable vs Callable
- Runnable接口不会返回结果或抛出检查异常
- Callable接口返回结果或抛出异常
工具类Executors可以将Runnable转化为Callable。
Executors.callable(Runnable task,Object object);
execute() vs submit()
- execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功与否
- submit()方法用于提交需要返回值的任务,线程池返回Future对象,通过这个对象可以判断任务是否执行成功。并通过get方法返回值,get方法会阻塞当前线程直到任务完全
shutdown() vs shutdownNow()
- shutdown():关闭线程池,线程池状态变为SHUTDOWN。线程池不再接受新任务,但队列里的任务得执行完毕
- shutdownNow():关闭线程池,线程状态变为STOP。线程池终止当前正在运行的任务,并停止处理排队的任务并返回正在等待的List。
isTerminated() vs isShutdown()
- isShutDown():当调用shutdown()后返回true
- isTerminated():当调用shutdown()后,并且所有提交任务完成后返回true。
几种常见的线程池详解
FixedThreadPool
介绍
FixedThreadPool,被称为可重用固定线程数的线程池。
/**
* 创建一个可重用固定数量线程的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。
执行任务过程
FixedThreadPool的execute()方法运行示意图
- 如果当前运行的线程数小于核心线程数,如果再来新任务,创建新的线程执行任务
- 当前任务的线程数等于核心线程数,如果再来新任务,将任务加入LinkedBlockingQueue
- 线程池中的线程执行完手头任务,会在循环中反复从LinkedBlockingQueue中获取任务来执行。
不推荐使用FixedThreadPool
FixedThreadPool
使用无界队列LinkedBlockingQueue
队列长度容量为Integer.MAX_VALUE,作为线程池的工作队列会带来如下影响
- 当线程池中的线程数达到核心线程数后,新任务将在无界队列中等待,因此线程池中的线程数不会超过核心线程数
- 由于使用无界队列时,线程池最大线程数将是一个无效参数,因为不可能存在任务队列满的情况,
FixedThreadPool
中corePoolSize
和maximumPoolSize
设为同一值 - 使用无界队列时,keepAliveTime将是一个无效参数
- 运行中的FixedPoolSize不会拒绝新任务,任务多时会导致OOM
SingleThreadExecutor
介绍
SingleThreadExecutor
是只有一个线程的线程池。
/**
*返回只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
执行任务过程
- 如果当前运行的线程数少于核心线程数,则创建新线程执行任务
- 如果当前线程池中有一个运行的线程后,将任务加入无界队列中
- 线程执行完当前任务后,会在循环中反复从无界队列中获取任务来执行
不推荐使用SingleThreadExecutor
SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM,
CachedThreadPool
介绍
CachedThreadPool是一个会根据需要创建新线程的线程池
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
CachedThreadPool 的 核心线程数被设置为0,最大线程数设置为Integer.MAX_VALUE,无界的,如果主线程提交任务的速度高于最大线程数中线程处理任务的速度,CachedThreadPool会不断创建新的线程。极端情况下,会导致耗尽CPU和内存资源
执行任务过程介绍
- 首先执行SynchronousQueue.offer(Runnable task)提交任务道任务队列。如果当前最大线程数中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤2
- 当初始maximumPool为空,或者maximumPool中没有空闲线程时,将没有线程执行SynchronousQueue.pool(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败,此时CachedThreadPool会创建新线程执行任务,execute方法执行完成
不推荐使用CachedThreadPool
CachedThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,导致OOM
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor主要用来给定的延迟后运行任务,或定期执行任务。基本不会用到,有其他方案代替比如quartz
简介
ScheduledThreadPoolExecutor使用的任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将先被执行。(squenceNumber变量小的先执行)
运行机制
ScheduledThreadPoolExecutor的执行主要分为两大部分:
- 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
- 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:
- 使用DelayQueue作为任务队列
- 获取任务的方法不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor执行周期任务的步骤
- 线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是值ScheduledFutureTask的time大于等于当前系统时间
- 线程1执行这个ScheduledFutureTask
- 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间
- 线程1把这个修改time之后的ScheduledFutureTask返回DelayQueue中(DelayQueue.add())
线程池大小确定
上下文切换:多线程编程中一般线程的个数都大于CPU核心的个数,而一个CPU核心在任意时刻只能被一个线程使用,为了让所有线程都能得到有效执行,CPU采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于上下文切换。
概况来说:当前任务在执行完CPU时间片切换到另一任务之前会保存自己的状态,以便下次切换回时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换
- CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,可以将线程数设置为N(CPU核心数)+1,比CPU核心数多一个线程是为了防止线程偶发的缺页中断,或其他原因导致的任务暂停而带来的影响。一旦任务暂停,CPU处于空闲状态,而这个情况下,多出来一个线程可以充分利用CPU空闲时间
- I/O密集型任务(2N):这种任务系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间内不会占用CPU来处理,这时可以将CPU交给其他线程使用。在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是CPU密集任务还是I/O密集任务
CPU密集任务是利用CPU计算能力的任务。比如在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类IO密集型,这类任务的特点就是CPU计算耗费时间相比等待IO操作完成的时间来说很少,大部分时间花在等待IO操作完成上。
参考
- 本文链接:https://wentianhao.github.io/2021/08/18/%E7%BA%BF%E7%A8%8B%E6%B1%A0/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。
若没有本文 Issue,您可以使用 Comment 模版新建。