进一步补充并发知识
synchronized 关键字
synchronized
关键字解决多个线程之间访问资源的同步性synchronized
关键字可以保证被它修饰的方法或代码块在任意时刻只能有一个线程执行
synchronized 使用
synchronized
关键字最主要的三种使用方法:
- 修饰实例方法:作用于当前对象实例加锁,进入同步代码前要获得当前对象实例的锁
synchronized void method(){ }
- 修饰静态方法:给当前类加锁,会作用于类的所有对象实例,进入同步代码前要获得 当前class的锁。因为静态成员不属于任何一个实例对象,是类成员(static表明这是该类的一个静态资源,不管new了多少个对象,只有一份)。所以如果线程A调用一个实例对象的非静态
synchronized
方法,而线程B需要调用这个实例对象所属类的静态synchronized
方法,是允许的,不会发生互斥现象,因为 访问静态synchronized
方法占用的锁是当前类的锁,而访问非静态synchronized
方法占用的是当前实例对象锁。
synchronized static void method(){
// 业务代码
}
- 修饰代码块:指定加锁对象,对给定对象/类加锁
synchronized(this|object)
:表示进入同步代码前要获得给定对象的锁
synchronized(类.class)
:表示进入同步代码前要获得当前类的锁
synchronized(this) {
}
小结
- synchronized 关键字加到static静态方法和synchronized(class)代码块上都是给 Class对象上锁
- synchronized 关键字加到实例方法上是给对象实例上锁。
- synchronized 关键字加到同步方法块,锁的是synchronized括号配置的对象
- 尽量不要使用
synchronized(String a)
因为JVM中,字符串常量池具有缓存功能
双重校验锁实现对象单例(线程安全)
public class Singleton{
private volatile static Singleton uniqueInstance;
private Singleton() {
}
public static Singleton getUniqueInstance() {
//先判断对象是否已经实例化,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
// 类对象加锁
synchronized (Singleton.class) {
if (uniqueInstance == null){
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}
uniqueInstance
采用 volatile
关键字修饰很有必要,uniqueInstance = new Singleton()
分为三步执行
- 为uniqueInstance分配内存空间
- 初始化uniqueInstance
- 将uniqueInstance指向分配的内存地址
JVM具有指令重排特性,执行顺序可能会变为1->3->2,指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没初始化的实例。例如,线程T1执行了1 和 3 ,此时 T2 调用getUniqueInstance()发现 uniqueInstance不为空,因此返回uniqueInstance,但是此时uniqueInstance还未被初始化。
使用 volatile 可以禁止 JVM 的指令重排,保证在多线程环境下也能正常运行
构造方法不能使用synchronized关键字修饰,构造方法本身就是线程安全的,不存在同步构造方法一说
底层原理
synchronized关键字底层原理属于JVM层面
synchronized 同步语句块
synchronized同步语句块的实现使用的是monitorenter 和 monitorexit指令,其中monitorenter指令指向同步代码块的开始位置,monitorexit指令则指明同步代码块的结束位置
当执行 monitorenter 指令时,线程试图获取锁也就是获取 对象监视器 monitor 的持有权,wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。
在执行monitorenter时,会尝试获取对象的锁,如果锁的计数器为 0 则表示锁可以被获取,获取后将锁计数器设为 1 也就是加 1。
在执行 monitorexit 指令后,将锁计数器设为 0,表明锁被释放。如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止。
synchronized 修饰方法
synchronized 修饰的方法并没有 monitorenter 指令和monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。
小结
synchronized 同步语句块的实现使用的是 monitorenter和monitorexit指令,其中monitor enter指令指向同步代码块的开始位置,monitor exit指令则指明同步代码块的结束位置。
synchronized 修饰方法时并没有 使用 monitorenter和 monitorexit,而是使用的是 ACC_SYNCHRONIZED 标识,指明了这个方法是一个同步方法
两者的本质都是 对对象监视器monitor的获取
jdk1.6之后优化
JDK1.6之后对锁的实现引入大量优化,如 偏向锁、轻量级锁、自旋锁、适应性锁、锁消除、锁粗化等技术来较少锁操作的开销
锁主要存在四种状态,无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态。随着竞争的激烈而不断升级,锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率
synchronized 和 ReentrantLock 区别
- 两者都是可重入锁
可重入锁:自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,这个对象锁还没释放,当其再次想获取这个对象的锁时还是可以获取的,如果不可锁重入的话,会造成死锁。同一个线程每次获取锁,锁的计数器自增1,所以要等到锁的计数器下降为0时才能释放锁
- synchronized依赖于JVM 而 ReentrantLock 依赖于 API
synchronized是依赖于JVM实现的,优化都是在虚拟机层面,没有暴露给我们
ReentrantLock 是JDK 层面实现的,需要lock()、unlock()方法配合try/finally语句块来完成,可以通过查看源代码,查看实现
- ReentrantLock比synchronized增加了一些高级功能
- 等待可中断:ReentrantLock提供了一种能够终端等待锁的线程机制,通过lock.lockInterruptibly()来实现这个机制。正在等待的线程可以选择放弃等待,改为处理其他事情
- 可实现公平锁:ReentrantLock可以指定是公平锁还是非公平锁,而synchronized只能是非公平锁。所谓公平锁是先等的线程先获得锁。ReentrantLock默认是非公平的,可以通过ReentrantLock类的
ReentrantLock(boolean fair)
构造方法来指定是否公平 - 可实现选择性通知(锁可以绑定多个条件):synchronized关键字与wait()和notify()/notifyAll()方法相结合可以实现等待/通知机制。ReentrantLock类当然也可以实现,但是需借助Condition接口与newCondition()方法
Condition是JDK1.5之后的,具有很好的灵活性。可以实现多路通知功能,在一个Lock对象中可以创建多个Condition实例(对象监视器),线程对象可以注册在指定的Condition中,从而有选择性的进行线程通知,在调度线程上更加灵活。
在使用notify()/notifyAll()方法进行通知时,被通知的线程是JVM选择的。
而ReentrantLock类结合Condition实例可以实现“选择性通知”。synchronized关键字就相当于整个Lock对象中只有一个Condition实例,所有的线程都注册在它身上,如果执行notifyAll()方法会通知所有处于等待状态的线程,这样会造成效率问题。而Condition()实例的signalAll()方法只会唤醒注册在该Condition实例中的所有等待线程
volatile 关键字
CPU缓存模型
开发网站后台使用的缓存(比如Redis)是为了解决程序处理速度和访问常规关系型数据库速度不对等的问题。CPU缓存则是为了解决CPU处理速度和内存处理速度不对等的问题
内存可以看作是外存的高速缓存,程序运行的时候,将外存的数据复制到内存,由于内存的处理速度远高于外存,这样就提高了处理速度
小结:CPU Cache 缓存的是内存数据用于解决CPU处理速度和内存不匹配的问题。内存缓存的是硬盘数据用于解决硬盘访问速度过慢的问题。
CPU cache的工作方式
先复制一份数据到CPU Cache中,当CPU需要用到的时候就直接从CPU Cache中读取数据,当运算完成后,再将运算得到的数据写回Main Memory中。但是这样就存在内存缓存不一致的问题! 比如:执行一个i++操作,两个线程同时执行,假设两个线程从CPU Cache中读取的i=1,两个线程做了1++运算之后再写回Main Memory之后 i = 2.而正确结果应该是i=3
CPU 为了解决内存缓存不一致性问题可以通过制定缓存一致协议或者其他手段来解决。
JMM(Java内存模型)
在JDK1.2之前,Java内存模型实现总是从 主存(即共享内存)读取变量,是不需要进行特别注意的,而在当前的Java内存模型下,线程可以把变量保存本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写,这就可能造成一个线程在主存中修改了一个变量的值,但另外一个线程还在继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。
要解决这个问题,就需要把变量声明为volatile,这就指示JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。
volatile 除了防止JVM指令重排,还有一个作用就是保证变量的可见性
并发编程的三个重要特性
- 原子性:一个操作或者多个操作,要么全部执行并且执行过程不会被任何因素打断,要么就都不执行,原子性就像数据库中的事务一样,它们是一个团队,同生共死。
synchronized
可以保证代码片段的原子性 - 可见性:当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看到修改的值。
volatile
保证可见性 - 有序性:程序执行的顺序按照代码的先后顺序执行。
volatile
保证一定的有序性,另外可以通过synchronized和Lock来保证有序性
synchronized 和 volatile 区别
- volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized关键字好。但volatile只能修饰变量,而synchronized可以修饰方法和代码块
- volatile保证数据的可见性,但不能保证数据的原子性,而synchronized两者都可保证
- volatile关键字主要用于解决多个线程之间的可见性,而synchronized解决的是多个线程之间访问资源的同步性
ThreadLocal
简介
通常情况下,创建的变量可以被任何一个线程访问并修改。JDK中提供的ThreadLocal
类是为了解决 实现每个线程都有自己专属本地变量
ThreadLocal
类让每个线程绑定自己的值,可以将ThreadLocal
类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。
如果创建了ThreadLocal
变量,那么访问这个变量的每个线程都会有这个变量的副本,这也是ThreadLocal
变量名的由来。使用get()
和set()
获取默认值或将其更改为当前线程所存的副本值,避免了线程安全的问题
demoTest
public class ThreadLocalTest implements Runnable {
// SimpleDataFormat 不是线程安全的,所以每个线程都要有自己独立的副本
private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm"));
public static void main(String[]args) throws InterruptedException {
ThreadLocalTest obj = new ThreadLocalTest();
for (int i=0; i< 3; i++) {
Thread t = new Thread(obj, " " + i);
Thread.sleep(new Random().nextInt(1000));
t.start();
}
}
@Override
public void run() {
System.out.println(" Thread Name = " + Thread.currentThread().getName() + " default Formatter = " + formatter.get().toPattern());
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
formatter.set(new SimpleDateFormat());
System.out.println(" Thread Name= " + Thread.currentThread().getName() + " formatter = " +formatter.get().toPattern());
}
}
输出
Thread Name = 0 default Formatter = yyyyMMdd HHmm
Thread Name= 0 formatter = yy-M-d ah:mm
Thread Name = 1 default Formatter = yyyyMMdd HHmm
Thread Name= 1 formatter = yy-M-d ah:mm
Thread Name = 2 default Formatter = yyyyMMdd HHmm
Thread Name= 2 formatter = yy-M-d ah:mm
Thread-0已经改变了formatter的值,但其他线程默认值与初始化值相等。
ThreadLocal 原理
public class Thread implements Runnable {
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;
//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}
Thread类有一个threadLocals和inheritableThreadLocals变量,都是ThreadLocalMap类型的变量,为ThreadLocal定制化的HashMap。默认情况下两变量都是null,只有当前线程调用ThreadLocal类的set或get方法时才创建。实际调用的是ThreadLocalMap的get和set
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this,value);
else
createMap(t,value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
最终变量是放在当前线程的ThreadLocalMap中,并不是存放在ThreadLocal上,ThreadLocal只是ThreadLocalMap的封装,传递了变量值。ThreadLocal类中可以通过Thread.currentThread()
获取到当前线程对象后,直接通过getMap(Thread t)访问该线程的ThreadLocalMap对象。
每个Thread具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为key,Object对象为value的键值对。
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//......
}
比如在同一线程中声明了两个ThreadLocal对象,会使用Thread内部都使用的仅有的那个ThreadLocalMap存放数据,ThreadLocal对象为key,value是ThreadLocal对象调用set方法设置的值
ThreadLocal 内存泄漏问题
ThreadLocalMap中使用的key为ThreadLocal的弱引用,而value是强引用。所以,如果ThreadLocal没有被外部强引用的情况下,在垃圾回收的时候,key会被清理掉,而value不会被清理掉。这样ThreadLocalMap就会出现key为null的Entry。假如不做任何措施的话,value永远无法被GC回收,可能会产生内存泄漏。ThreadLocalMap实现中已经考虑到这个情况,在调用set(),get(),removr()方法时,会清理掉key为null的记录。使用完ThreadLocal方法后,最好手动调用remove()方法
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
弱引用:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它 所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象
线程池
目的
池化技术很多,线程池、数据库连接池、Http连接池等都是这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供一种限制和管理资源(包括执行一个任务)。每个线程池还维护一些基本统计信息,例如已完成任务的数量
优点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高相应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅消耗系统资源,还降低系统稳定性,使用线程池进行同意的分配,调优和监控
实现Runnable接口和Callable接口的区别
Runnable在Java1.0就一直存在,Callable仅在Java1.5中引入,目的是为了处理Runnable不支持的用例。Runnable接口不会返回结果或抛出检查异常,但Callable接口可以。所以不需要返回结果或抛出异常的推荐使用Runnable
工具类Executors可以实现将Runnable对象转换成Callable对象.Executors.callable(Runnable tash)
或Executors.callable(Runnable task,Object result)
Runnable
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}
Callable
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
execute() 和 submit()
- execute():用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
- submit():用于提交需要返回值的任务,线程池会返回一个Future类型对象,通过这个对象可以判断任务是否执行成功,并且通过Future的get()方法来获取返回值,get()方法回阻塞当前线程直到任务完成,get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
newTaskFor方法返回一个FutureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
execute()
public void execute(Runnable command) {
...
}
创建线程池
《阿里巴巴 Java 开发手册》中强制线程池不允许使用Executors去创建,需要通过ThreadPoolExecutor,这样的处理让写的同学更加明确线程池的运行规则,规避资源耗尽风险
Executors返回线程池对象的弊端:
- FixedThreadPool和SingleThreadExecutor:允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM
- CachedThreadPool 和 ScheduledThreadPool:允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
- 通过构造方法
- 通过Executor框架的工具类Executors来实现
可以创造三种类型的ThreadPoolExecutor:
- FixedThreadPool:该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
- SingleThreadExecutor:方法返回一个只有一个线程的线程池,若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务
- CachedThreadExecutor:该方法返回一个可根据实现情况调整线程数量的线程池,线程池的数量不确定,但若有空闲的 线程可以复用,则会优先使用复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,返回线程池进行复用。
ThreadPoolExecutor类分析
ThreadPoolExecutor提供了四个构造方法。
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程数定义了最小可以同时运行的线程数量
- maximumPoolSize:当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量变为最大线程数
- workQueue:当心任务来时,会判断当前运行的线程数量是否达到核心线程数,如果达到,新任务则会被存放到队列中
- keepAliveTime:当线程池中的线程数大于corePoolSize时,如果没有新任务提交,核心线程外的线程不会立即销毁,而是等待,等待时间超过keepAliveTime后才会被回收销毁
- unit:keepAliveTime参数的时间单位
- threadFactory:executor创建新线程会用到
- handler:饱和策略
ThreadPoolExecutor饱和策略
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor定义了一些策略:
- ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝新任务的处理。
- ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉
- ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。
Spring通过ThreadPoolTaskExecutor或者我们直接通过ThreadPoolExecutor的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler饱和策略的话来配置线程池的时候默认使用ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor将抛出RejectedExecutionException来拒绝新任务,这代表将丢弃对这个任务的处理。对于可伸缩的应用程序,建议使用ThreadPoolExecutor.CallerRunsPolicy.当最大池被填满时,此策略为我们提供可伸缩队列。
线程池demo
package leetcode.test;
import java.util.Date;
public class MyRunnable implements Runnable {
private String command;
public MyRunnable (String s){
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
使用ThreadPoolExecutor构造函数自定义参数的方式创建线程池
package leetcode.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
// 核心线程数 : 最下可运行的线程数量
private static final int CORE_POOL_SIZE = 5;
// 最大线程数
private static final int MAX_POOL_SIZE = 10;
// 队列容量
private static final int QUEUE_CAPACITY = 100;
// 存活时间
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[]args){
// 使用阿里巴巴推荐的创建线程池的方法
// 通过ThreadPoolExecutor构造函数 自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
// 任务队列
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
// 饱和策略 CallerRunsPolicy
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i=0;i<10;i++) {
// 创建WorkerThread对象(WorkerThread类实现了Runnable接口)
Runnable worker = new MyRunnable(" " + i);
// 执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()){
}
System.out.println("Finished all threads");
}
}
线程池原理分析
通过上面代码可以看出:线程池首先会执行5个任务,然后这些任务有任务被执行完的话,就拿新任务执行。
execute()方法。使用executor.execute(worker)
来提交一个任务到线程池中。
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
addWorker这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则返回false.
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池中线程的数量
int wc = workerCountOf(c);
// core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
Atomic 原子类
Atomic指一个操作是不可中断的,即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所谓原子类就是具有原子/原子操作特征的类
并发包java.util.concurrent
的原子类都存放在java.util.concurrent.atomic
JUC 包中的原子类
基本类型
使用原子的方式更新基本类型
AtomicInteger
:整型原子类AtomicLong
:长整型原子类AtomicBoolean
:布尔型原子类
数组类型
使用原子的方式更新数组里的某个元素
AtomicIntegerArray
:整型数组原子类AtomicLongArray
:长整型数组原子类AtomicReferenceArray
:引用类型数组原子类
引用类型
AtomicReference
:引用类型原子类AtomicStampedReference
:原子更新带有版本号的引用类型。
该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题AtomicMarkableReference
:原子更新带有标记位的引用类型
对象的属性修改类型
AtomicIntegerFieldUpdater
:原子更新整型字段的更新器AtomicLongFieldUpdater
:原子更新长整型字段的更新器AtomicReferenceFieldUpdater
:原子更新引用类型的更新器
AtomicInteger使用
常用方法
public final int get(); //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
使用AtomicInteger之后,不用对increment()方法加锁也可保证线程安全
public class AtomicIntegerTest {
private AtomicInteger count = new AtomicInteger();
// 使用AtomicInteger之后,不需要对该方法加锁,也可以实现线程安全。
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
AtomicInteger类 原理
AtomicInteger线程安全原理
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
AtomicInteger类 主要利用CAS(compare and swap) + volatile + native方法来保证原子操作,从而避免synchronized的高开销,执行效率大大提升。
CAS原理:拿一个期望的值和原本的值进行比较,如果相同则更新成新的值。UnSafe类的objectFieldOffset()方法是一个本地方法,这个方法是拿“原来的值”的内存地址,返回值是valueOffset。另外value是一个volatile变量,在内存中可见,因此JVM可以保证任何时刻任何线程总能拿到该变量的最新值。
AQS
介绍
AQS(AbstractQueuedSynchronizer). 这个类在java.util.concurrent.locks
包下面
AQS:是一个用来构建锁和同步器的框架,使用AQS能简单且高效的构造出应用广泛大量的同步器,比如提高的ReentrantLock
,Semaphore
,其他的ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等都是基于AQS。
AQS原理分析
AQS核心思想:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果将请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过protected类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
//设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS对资源的共享方法
AQS定义两种资源共享方式
- Exclusive(独占):只有一个线程能执行,如
ReentrantLock
。又可以分为公平锁和非公平锁- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
- Share(共享):多个线程可以同时执行,如
CountDownLatch
、Semaphore
、CyclicBarrier
、ReadWriteLock
ReentrantReadWriteLock
可以看成是组合式,因为 ReentrantReadWriteLock
也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需实现共享资源state的获取与释放方式即可。至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
AQS底层模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是:
- 使用者继承
AbstractQueueSynchronizer
并重写指定的方法。(重写对共享资源state的获取和释放) - 将AQS组合在自定义同步组件的实现中,调用其模板方法,而这些模板方法会调用使用者重写的方法
AQS使用模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException。这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。当然释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入概念。获取多少次就要释放多少次,这样才能保证state是能回到零态的
再以CountDownLatch为例,任务分为N个子线程去执行,state也初始化为N(N需要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDoen()一次,state会CAS(compare and swap)减一,等到所有子线程都执行完后state=0,会unpark()主调用线程,主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
AQS组件总结
Semaphore
(信号量)-允许多个线程同时访问:synchronized和ReentrantLock都是一次只允许一个线程访问某个资源CountDownLatch
(倒计时器):CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某个线程等待直到倒计时结束,再开始执行。CyclicBarrier
(循环栅栏):CyclicBarrier和CountDownLatch非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
CountDownLatch
CountDownLatch 的作用就是 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 CountDownLatch 。具体场景是下面这样的:
我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。
为此我们定义了一个线程池和 count 为 6 的CountDownLatch对象 。使用线程池处理读取任务,每一个线程处理完之后就将 count-1,调用CountDownLatch对象的 await()方法,直到所有文件读取完之后,才会接着执行后面的逻辑。
public class CountDownLatchTest {
/**
* CountDownLatch:允许count个线程阻塞在同一个地方,直到所有线程都完成任务
*/
// 处理文件的数量
private static final int threadCount = 6;
public static void main(String []args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
ExecutorService threadPool = Executors.newFixedThreadPool(10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i=0;i<threadCount;i++) {
final int threadNum = i;
threadPool.execute(()->{
// 处理文件业务操作
try{
System.out.println( threadNum + " 作业已完成");
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
}
改进:使用CompletableFuture
类来改进!。Java8的CompletableFuture提供了很多对多线程友好的方法,使用它可以方便的为编写多线程程序。
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
//......
}
System.out.println("all done. ");
当任务过多的时候,可以考虑通过循环来添加任务。
//文件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有文件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
.map(filePath -> doSomeThing(filePath))
.collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);
参考
- 本文链接:https://wentianhao.github.io/2021/08/11/%E5%B9%B6%E5%8F%91%E8%BF%9B%E9%98%B6/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。
若没有本文 Issue,您可以使用 Comment 模版新建。