并发编程已完结,章节如下: Java 并发编程上篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理) Java 并发编程中篇 -(JMM、CAS 原理、Volatile 原理) Java 并发编程下篇 -(线程池) Java 并发编程下篇 -(JUC、AQS 源码、ReentrantLock 源码) 学习视频
一、基本概念 1、进程与线程 进程
程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的。
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
线程
一个进程之内可以分为一到多个线程。
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行 。
Java 中,线程作为小调度单位,进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作 为线程的容器
两者对比
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集 进程拥有共享的资源,如内存空间等,供其内部的线程共享
进程间通信较为复杂 同一台计算机的进程通信称为 IPC(Inter-process communication)
不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
2、并发与并行 并发是一个CPU在不同的时间去不同线程中执行指令。 并行是多个CPU同时处理不同的线程。 引用 Rob Pike 的一段描述:
并发(concurrent)是同一时间应对(dealing with)多件事情的能力
并行(parallel)是同一时间动手做(doing)多件事情的能力
3、应用 同步和异步的概念 以调用方的角度讲,如果
需要等待结果返回才能继续运行的话就是同步
不需要等待就是异步
1)设计 多线程可以使方法的执行变成异步的,比如说读取磁盘文件时,假设读取操作花费了5秒,如果没有线程的调度机制,那么 cpu 只能等 5 秒,啥都不能做。
结论 比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程 tomcat 的异步 servlet 也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞 tomcat 的工作线程 ui 程序中,开线程进行其他操作,避免阻塞 ui 线程
结论 1) 单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用 cpu ,不至于一个线程总占用 cpu,别的线程没法干活 2)多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
3)IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一 直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化
二、Java 线程 1、线程创建与运行 方法一,使用 Thread
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) { Thread t = new Thread ("t1" ) { @Override public void run () { log.debug("running" ); } }; t.start(); log.debug("running" ); }
方法二,使用 Runnable 配合 Thread(推荐)
1 2 3 4 5 6 public static void main (String[] args) { Runnable r = () -> log.debug("running" ); new Thread (r, "t1" ).start(); }
比较方法一和方法二: 方法 1 是把线程和任务合并在了一起 方法 2 是把线程和任务分开了,用 Runnable 更容易与线程池等高级 API 配合,用 Runnable 让任务类脱离了 Thread 继承体系,更灵活。 通过查看源码可以发现,方法二其实还是通过使用 Thread 类中的 run 方法执行的!方法三,FutureTask 配合 Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> future = new FutureTask <Integer>(() -> { log.debug("running..." ); Thread.sleep(2000 ); return 100 ; }); Thread t1 = new Thread (future, "t1" ); t1.start(); log.debug("{}" , future.get()); }
Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; boolean isCancelled () ; boolean isDone () ; } 123456789101112
FutureTask 类是 Future 接口和 Runable 接口的实现弥补 runnable 创建线程没有返回值的缺陷,点这里了解
2、线程运行原理 栈与栈帧 拟机栈描述的是Java方法执行的内存模型: 每个方法被执行的时候都会同时创建一个栈帧(stack frame)用于存储局部变量表、操作数栈、动态链接、方法出口等信息,是属于线程的私有的。 当java中使用多线程时,每个线程都会维护它自己的栈帧!每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法,当方法执行完会来到栈帧中的方法出口地址位置,然后从栈中 pop 出栈帧。线程上下文切换(Thread Context Switch) 因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码被动原因:
线程的 cpu 时间片用完(每个线程轮流执行,看前面并行的概念)
垃圾回收
有更高优先级的线程需要运行
主动原因:
线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的。
3、Thread 的常见方法
方法名
static
功能说明
注意
start()
启动一个新线程,在新线程中运行 run 方法中的代码
start 方法只是让线程进入就绪状态,里面代码不一定立刻运行,只有当 CPU 将时间片分给线程时,才能进入运行状态,执行代码。每个线程的 start 方法只能调用一次,调用多次就会出现 IllegalThreadStateException
run()
新线程启动会调用的方法
如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象,来覆盖默认行为
join()
等待线程运行结束
join(long n)
等待线程运行结束,最多等待 n 毫秒
getId()
获取线程长整型的 id
id 唯一
getName()
获取线程名
setName(String)
修改线程名
getPriority()
获取线程优先级
setPriority(int)
修改线程优先级
java中规定线程优先级是1~10 的整数,较大的优先级能提高该线程被 CPU 调度的机率
getState()
获取线程状态
Java 中线程状态是用 6 个 enum 表示,分别为:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED
isInterrupted()
判断是否被打断
不会清除 打断标记
isAlive()
线程是否存活(还没有运行完毕)
interrupt()
打断线程
如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException,并清除 打断标记 ;如果打断的正在运行的线程,则会设置 打断标记,park 的线程被打断,也会设置 打断标记
interrupted()
static
判断当前线程是否被打断
会清除 打断标记
currentThread()
static
获取当前正在执行的线程
sleep(long n)
static
让当前执行的线程休眠n毫秒,休眠时让出 cpu 的时间片给其它线程
yield()
static
提示线程调度器让出当前线程对CPU的使用
主要是为了测试和调试
1)start() VS run() 直接调用 run() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) { Thread t1 = new Thread (new Runnable () { @Override public void run () { log.info(Thread.currentThread().getName() + " running...." ); } }, "t1" ); t1.run(); log.info(Thread.currentThread().getName() + " running..." ); } 1234567891011121314
结果
1 2 3 14 :56 :56 [main] c.Code_05_Test - main running....14 :56 :56 [main] c.Code_05_Test - main running...12
调用 start() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { Thread t1 = new Thread (new Runnable () { @Override public void run () { log.info(Thread.currentThread().getName() + " running...." ); } }, "t1" ); t1.start(); log.info(Thread.currentThread().getName() + " running..." ); } 12345678910111213141516
结果
1 2 3 4 14 :59 :35 [main] c.Code_05_Test - main running...14 :59 :35 [t1] c.Code_05_Test - t1 running....123
发现两种结果是不一样的,使用 start 方式,CPU 会为创建的线程分配时间片,线程进入运行状态,然后线程调用 run 方法执行逻辑。直接使用 run 的方式,虽然会创建了线程,但是它是直接调用方法,而不是像 start 方式那样触发的,这个线程对象会处一直处在新建状态,从结果上也可以看出,run 方法是 main 线程调用,而不是 t1 线程。
2)sleep()与yield() sleep (使线程阻塞)
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),可通过state()方法查看
其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
睡眠结束后的线程未必会立刻得到执行
建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
yield (让出当前线程)
调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态(仍然有可能被执行),然后调度执行其它线程
具体的实现依赖于操作系统的任务调度器
线程优先级 线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它 如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用
3)join() 方法 用于等待某个线程结束。哪个线程内调用join()方法,就等待哪个线程结束,然后再去执行其他线程。 如在主线程中调用ti.join(),则是主线程等待t1线程结束,join 采用同步。
1 2 3 4 5 6 Thread t1 = new Thread ();t1.join(); t1.join(1000 ); 12345
4)interrupt() 方法 interrupt 打断线程有两种情况,如下:
如果一个线程在在运行中被打断,打断标记会被置为 true 。
如果是打断因sleep wait join 方法而被阻塞的线程,会将打断标记置为 false 。
isInterrupted() 与 interrupted() 比较,如下: 首先,isInterrupted 是实例方法,interrupted 是静态方法,它们的用处都是查看当前打断的状态,但是 isInterrupted 方法查看线程的时候,不会将打断标记清空,也就是置为 false,interrupted 查看线程打断状态后,会将打断标志置为 false,也就是清空打断标记,简单来说,interrupt() 方法类似于 setter 设置中断值,isInterrupted() 类似于 getter 获取中断值,interrupted() 类似于 getter + setter 先获取中断值,然后清除标志。 用代码测试如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j(topic = "c.Code_14_Test") public class Code_14_Test { public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { log.info("park" ); LockSupport.park(); log.info("unpark" ); log.info("打断标记为:{}" , Thread.interrupted()); LockSupport.park(); log.info("unpark" ); }, "t1" ); t1.start(); Thread.sleep(1000 ); t1.interrupt(); } } 1234567891011121314151617181920212223242526272829303132333435363738394041
终止模式之两阶段终止模式,如下: Two Phase Termination,就是考虑在一个线程T1中如何优雅地终止另一个线程T2?这里的优雅指的是给T2一个料理后事的机会(如释放锁)。 代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Slf4j(topic = "c.Code_13_Test") public class Code_13_Test { public static void main (String[] args) throws InterruptedException { TwoParseTermination twoParseTermination = new TwoParseTermination (); twoParseTermination.start(); Thread.sleep(3500 ); twoParseTermination.stop(); } } @Slf4j(topic = "c.TwoParseTermination") class TwoParseTermination { private Thread monitor; public void start () { monitor = new Thread (() -> { while (true ) { Thread thread = Thread.currentThread(); if (thread.isInterrupted()) { log.info("料理后事 ..." ); break ; } else { try { Thread.sleep(1000 ); log.info("执行监控的功能 ..." ); } catch (InterruptedException e) { log.info("设置打断标记 ..." ); thread.interrupt(); e.printStackTrace(); } } } }, "monitor" ); monitor.start(); } public void stop () { monitor.interrupt(); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
5)sleep,yiled,wait,join 对比 参考文章:点这里
6)守护线程 默认情况下,java进程需要等待所有的线程结束后才会停止,但是有一种特殊的线程,叫做守护线程,在其他线程全部结束的时候即使守护线程还未结束代码未执行完java进程也会停止。普通线程t1可以调用 t1.setDeamon(true); 方法变成守护线程。
1 2 注意 垃圾回收器线程就是一种守护线程 Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等 待它们处理完当前请求 1
4、线程状态 1)线程的 5 种状态 从操作系统层划分,线程有 5 种状态
初始状态,仅仅是在语言层面上创建了线程对象,即Thead thread = new Thead();,还未与操作系统线程关联
可运行状态,也称就绪状态,指该线程已经被创建,与操作系统相关联,等待cpu给它分配时间片就可运行
运行状态,指线程获取了CPU时间片,正在运行 当CPU时间片用完,线程会转换至【可运行状态】,等待 CPU再次分配时间片,会导致我们前面讲到的上下文切换
阻塞状态 1. 如果调用了阻塞API,如BIO读写文件,那么线程实际上不会用到CPU,不会分配CPU时间片,会导致上下文切换,进入【阻塞状态】 2. 等待BIO操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】 3. 与【可运行状态】的区别是,只要操作系统一直不唤醒线程,调度器就一直不会考虑调度它们,CPU就一直不会分配时间片
终止状态,表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
2)线程的 6 种状态 这是从 Java API 层面来描述的,我们主要研究的就是这种。可以参考文章,点这里
NEW 跟五种状态里的初始状态是一个意思
RUNNABLE 是当调用了 start() 方法之后的状态,注意,Java API 层面的 RUNNABLE 状态涵盖了操作系统层面的【可运行状态】、【运行状态】和【io阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)
BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分。
演示线程的 6 种状态,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 @Slf4j(topic = "c.Code_15_Test") public class Code_15_Test { public static void main (String[] args) { Thread t1 = new Thread (() -> { log.info("NEW 状态" ); }, "t1" ); Thread t2 = new Thread (() -> { while (true ) { } }, "t2" ); t2.start(); Thread t3 = new Thread (() -> { log.info("running" ); }, "t3" ); t3.start(); Thread t4 = new Thread (() -> { synchronized (Code_15_Test.class) { try { Thread.sleep(100000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t4" ); t4.start(); Thread t5 = new Thread (() -> { try { t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t5" ); t5.start(); Thread t6 = new Thread (() -> { synchronized (Code_15_Test.class) { try { Thread.sleep(100000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t6" ); t6.start(); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t1 线程状态: {}" , t1.getState()); log.info("t2 线程状态: {}" , t2.getState()); log.info("t3 线程状态: {}" , t3.getState()); log.info("t4 线程状态: {}" , t4.getState()); log.info("t5 线程状态: {}" , t5.getState()); log.info("t6 线程状态: {}" , t6.getState()); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
结论 本章的重点在于掌握 1)线程的创建 2)线程重要的 API,如 start、run、sleep、yield、join、interrupt 等 3)线程的状态 4)原理方面,线程的运行流程,栈、栈帧、上下文切换、程序计数器等知识。 5)Thread 两种创建线程的源码 6)使用 interrupt 来编写两阶段终止
三、共享模型之管程 1、线程共享带来的问题 线程出现问题的根本原因是因为线程上下文切换,导致线程里的指令没有执行完就切换执行其它线程了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static int count = 0 ; public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { for (int i = 1 ;i < 5000 ; i++){ count++; } }); Thread t2 = new Thread (() -> { for (int i = 1 ;i < 5000 ; i++){ count--; } }); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("count的值是{}" ,count); } 123456789101112131415161718
如上代码,当执行 count++ 或者 count– 操作的时候,从字节码分析,实际上是 4 步操作。
1 2 3 4 5 6 7 8 9 10 11 12 count++; getstatic i iconst_1 iadd putstatic i count--; getstatic i iconst_1 isub putstatic i 1234567891011
当 CPU 时间片分给 t1 线程时,t1 线程去读取变量值为 0 并且执行 ++ 的操作,如上在字节码自增操作中,当 t1 执行完自增,还没来得急将修改后的值存入静态变量时,假如线程的时间片用完了,并且 CPU 将时间片分配给 t2 线程,t2 线程拿到时间片执行自减操作,并且将修改后的值存入静态变量,此时 count 的值为 -1,但是当 CPU 将时间片分给经历了上下文切换的 t1 线程时,t1 将修改后的值存入静态变量,此时 counter 的值为 1,覆盖了 t2 线程执行的结果,出现了丢失更新,这就是多线对共享资源读取的问题。1)临界区 Critical Section
一个程序运行多个线程本身是没有问题的
问题出在多个线程访问共享资源
多个线程读共享资源其实也没有问题
在多个线程对共享资源读写操作时发生指令交错,就会出现问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区 例如,下面代码中的临界区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static int counter = 0 ; static void increment () { counter++; } static void decrement () { counter--; } 12345678910111213
2)竞态条件 Race Condition 多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
2、synchronized 解决方案 1)解决手段 为了避免临界区中的竞态条件发生,由多种手段可以达到。
阻塞式解决方案:synchronized ,Lock
非阻塞式解决方案:原子变量
现在讨论使用 synchronized 来进行解决,即俗称的对象锁 ,它采用互斥的方式让同一时刻至多只有一个线程持有对象锁,其他线程如果想获取这个锁就会阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
2)synchronized 语法 1 2 3 4 synchronized (对象) { } 123
3)synchronized 加在方法上
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Test { public synchronized void test () { } public void test () { synchronized (this ) { } } } 123456789101112
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Test { public synchronized static void test () { } public void test () { synchronized (Test.class) { } } } 123456789101112
3、变量的线程安全分析 1)成员变量和静态变量的线程安全分析
如果变量没有在线程间共享,那么线程对该变量操作是安全的
如果变量在线程间共享
如果只有读操作,则线程安全
如果有读写操作,则这段代码就是临界区,需要考虑线程安全问题
2)部变量线程安全分析
局部变量【局部变量被初始化为基本数据类型】是安全的
局部变量是引用类型或者是对象引用则未必是安全的
如果局部变量引用的对象没有引用线程共享的对象,那么是线程安全的
如果局部变量引用的对象引用了一个线程共享的对象,那么要考虑线程安全问题
3)线程安全的情况 局部变量被初始化为基本数据类型是安全的,代码如下,因为每个线程都会有一份 test() 放在线程私有的栈中,多个线程就有多个,是不被多个线程共享的,所有就没有线程安全问题。
1 2 3 4 5 public static void test () { int i = 10 ; i++; } 1234
4)线程不安全的情况 如果局部变量引用的对象逃离方法的范围,那么要考虑线程安全的,分析如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Slf4j(topic = "c.Code_18_Test") public class Code_18_Test { public static void main (String[] args) { UnsafeTest unsafeTest = new UnsafeTest (); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { unsafeTest.method1(); }, "t" + i).start(); } } } class UnsafeTest { List<Integer> list = new ArrayList <>(); public void method1 () { for (int i = 0 ; i < 200 ; i++) { method2(); method3(); } } public void method2 () { list.add(1 ); } public void method3 () { list.remove(0 ); } } 12345678910111213141516171819202122232425262728293031323334
5)不安全原因分析 如图所示,因为 list 是实例变量,则多个线程都会使用到这个共享的实例变量,就会出现线程安全问题,为什么会有安全问题呢,首先要理解 list 添加元素的几步操作,第一步会获取添加元素的下标 index,第二步对指定的 index 位置添加元素,第三步将 index 往后移。 当 t0 线程从 list 拿到 index = 0 后,t0 线程的时间片用完,出现上下文切换,t1 获取时间片开始执行,从 list 也拿到 index = 0,然后将元素添加到 index 位置,然后将 index 值加 1,然后 t0 线程获取时间片,对 index = 0 位置添加元素,此时 index = 0 已经存在元素,就会出现报错。
6)解决方法 可以将 list 修改成局部变量,然后将 list 作为引用传入方法中,因为局部变量是每个线程私有的,不会出现共享问题,那么就不会有上述问题了。修改的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class SafeTest { public void method1 () { List<Integer> list = new ArrayList <>(); for (int i = 0 ; i < 200 ; i++) { method2(list); method3(list); } } public void method2 (List<Integer> list) { list.add(1 ); } public void method3 (List<Integer> list) { list.remove(0 ); } } 123456789101112131415161718192021
7)思考 private 或 final的重要性 在上诉代码中,其实存在线程安全的问题,因为 method2,method3 方法都是用 public 声明的,如果一个类继承 SafeTest 类,对 method2,method3 方法进行了重写,比如重写 method3 方法,代码如下:
1 2 3 4 5 6 7 8 9 10 class UnsafeSubTest extends UnsafeTest { @Override public void method3 (List<Integer> list) { new Thread (() -> { list.remove(0 ); }).start(); } } 123456789
可以看到重写的方法中又使用到了线程,当主线程和重写的 method3 方法的线程同时存在,此时 list 就是这两个线程的共享资源了,就会出现线程安全问题,我们可以用 private 访问修饰符解决此问题,代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class ThreadSafe { public final void method1 (int loopNumber) { ArrayList<String> list = new ArrayList <>(); for (int i = 0 ; i < loopNumber; i++) { method2(list); method3(list); } } private void method2 (ArrayList<String> list) { list.add("1" ); } private void method3 (ArrayList<String> list) { list.remove(0 ); } } class ThreadSafeSubClass extends ThreadSafe { @Override public void method3 (ArrayList<String> list) { new Thread (() -> { list.remove(0 ); }).start(); } } 1234567891011121314151617181920212223
从这个例子可以看出 private 或 final 提供【安全】的意义所在,请体会开闭原则中的【闭】。
8)常见线程安全类
String
Integer
StringBuffer
Random
Vector (List的线程安全实现类)
Hashtable (Hash的线程安全实现类)
java.util.concurrent 包下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时 ,是线程安全的。如:
1 2 3 4 5 6 7 8 Hashtable table = new Hashtable ();new Thread (()->{ table.put("key1" , "value1" ); }).start(); new Thread (()->{ table.put("key2" , "value2" ); }).start(); 1234567
线程安全类方法的组合 但注意它们多个方法的组合不是原子的,看如下代码
1 2 3 4 5 6 Hashtable table = new Hashtable ();if ( table.get("key" ) == null ) { table.put("key" , value); } 12345
如上图所示,当使用方法组合时,出现了线程安全问题,当线程 1 执行完 get(“key”) ,这是一个原子操作没出问题,但是在 get(“key”) == null 比较时,如果线程的时间片用完了,线程 2 获取时间片执行了 get(“key”) == null 操作,然后进行 put(“key”, “v2”) 操作,结束后,线程 1 被分配 cpu 时间片继续执行,执行 put 操作就会出现线程安全问题。
不可变类的线程安全 String和Integer类都是不可变的类,因为其类内部状态是不可改变的,因此它们的方法都是线程安全的,有同学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,其实调用这些方法返回的已经是一个新创建的对象了!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public String substring (int beginIndex, int endIndex) { if (beginIndex < 0 ) { throw new StringIndexOutOfBoundsException (beginIndex); } if (endIndex > value.length) { throw new StringIndexOutOfBoundsException (endIndex); } int subLen = endIndex - beginIndex; if (subLen < 0 ) { throw new StringIndexOutOfBoundsException (subLen); } return ((beginIndex == 0 ) && (endIndex == value.length)) ? this : new String (value, beginIndex, subLen); } 1234567891011121314
示例分析-是否线程安全 示例一: 分析线程是否安全,先对类的成员变量,类变量,局部变量进行考虑,如果变量会在各个线程之间共享,那么就得考虑线程安全问题了,如果变量A引用的是线程安全类的实例,并且只调用该线程安全类的一个方法,那么该变量A是线程安全的的。下面对实例一进行分析:此类不是线程安全的,MyAspect切面类只有一个实例,成员变量start 会被多个线程同时进行读写操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Aspect @Component public class MyAspect { private long start = 0L ; @Before("execution(* *(..))") public void before () { start = System.nanoTime(); } @After("execution(* *(..))") public void after () { long end = System.nanoTime(); System.out.println("cost time:" + (end-start)); } } 1234567891011121314151617
示例二: 此例是典型的三层模型调用,MyServlet UserServiceImpl UserDaoImpl类都只有一个实例,UserDaoImpl类中没有成员变量,update方法里的变量引用的对象不是线程共享的,所以是线程安全的;UserServiceImpl类中只有一个线程安全的UserDaoImpl类的实例,那么UserServiceImpl类也是线程安全的,同理 MyServlet也是线程安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl (); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl (); public void update () { userDao.update(); } } public class UserDaoImpl implements UserDao { public void update () { String sql = "update user set password = ? where username = ?" ; try (Connection conn = DriverManager.getConnection("" ,"" ,"" )){ } catch (Exception e) { } } } 1234567891011121314151617181920212223242526
示例三: 跟示例二大体相似,UserDaoImpl类中有成员变量,那么多个线程可以对成员变量conn 同时进行操作,故是不安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl (); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl (); public void update () { userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection conn = null ; public void update () throws SQLException { String sql = "update user set password = ? where username = ?" ; conn = DriverManager.getConnection("" ,"" ,"" ); conn.close(); } } 12345678910111213141516171819202122232425
示例四: 跟示例三大体相似,UserServiceImpl类的update方法中 UserDao是作为局部变量存在的,所以每个线程访问的时候都会新建有一个UserDao对象,新建的对象是线程独有的,所以是线程安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl (); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { public void update () { UserDao userDao = new UserDaoImpl (); userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection = null ; public void update () throws SQLException { String sql = "update user set password = ? where username = ?" ; conn = DriverManager.getConnection("" ,"" ,"" ); conn.close(); } } 1234567891011121314151617181920212223
示例五:
1 2 3 4 5 6 7 8 9 10 11 12 public abstract class Test { public void bar () { SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ); foo(sdf); } public abstract foo (SimpleDateFormat sdf) ; public static void main (String[] args) { new Test ().bar(); } } 1234567891011
其中 foo 的行为是不确定的,可能导致不安全的发生,被称之为外星方法,因为 foo 方法可以被重写,导致线程不安全。在 String 类中就考虑到了这一点,String 类是 final 关键字声明的,子类不能重写它的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void foo (SimpleDateFormat sdf) { String dateStr = "1999-10-11 00:00:00" ; for (int i = 0 ; i < 20 ; i++) { new Thread (() -> { try { sdf.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); } }).start(); } } 123456789101112
4、Monitor 概念 1)Java 对象头 以 32 位虚拟机为例,普通对象的对象头结构如下,其中的 Klass Word 为指针,指向对应的 Class 对象; 普通对象 数组对象 其中 Mark Word 结构为 所以一个对象的结构如下:
2)Monitor 原理 Monitor 被翻译为监视器或者说管程 每个 java 对象都可以关联一个 Monitor ,如果使用 synchronized 给对象上锁(重量级),该对象头的 Mark Word 中就被设置为指向 Monitor 对象的指针。
刚开始时 Monitor 中的 Owner 为 null
当 Thread-2 执行 synchronized(obj){} 代码时就会将 Monitor 的所有者Owner 设置为 Thread-2,上锁成功,Monitor 中同一时刻只能有一个 Owner
当 Thread-2 占据锁时,如果线程 Thread-3 ,Thread-4 也来执行synchronized(obj){} 代码,就会进入 EntryList(阻塞队列) 中变成BLOCKED(阻塞) 状态
Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的
图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲 wait-notify 时会分析
注意:synchronized 必须是进入同一个对象的 monitor 才有上述的效果不加 synchronized 的对象不会关联监视器,不遵从以上规则
5、synchronized 原理进阶 1)synchronized 用于同步代码块与同步方法原理 参考这篇文章
2)轻量级锁 轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是 synchronized ,假设有两个方法同步块,利用同一个对象加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 static final Object obj = new Object ();public static void method1 () { synchronized ( obj ) { method2(); } } public static void method2 () { synchronized ( obj ) { } } 123456789101112
每次指向到 synchronized 代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的 Mark Word 和对象引用 reference
让锁记录中的 Object reference 指向对象,并且尝试用 cas(compare and sweep) 替换 Object 对象的 Mark Word ,将 Mark Word 的值存入锁记录中。
如果 cas 替换成功,那么对象的对象头储存的就是锁记录的地址和状态 00 表示轻量级锁,如下所示
如果cas失败,有两种情况 1. 如果是其它线程已经持有了该 Object 的轻量级锁,那么表示有竞争,首先会进行自旋锁,自旋一定次数后,如果还是失败就进入锁膨胀阶段。 2. 如果是自己的线程已经执行了 synchronized 进行加锁,那么再添加一条 Lock Record 作为重入的计数。
当线程退出 synchronized 代码块的时候,如果获取的是取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一
当线程退出 synchronized 代码块的时候,如果获取的锁记录取值不为 null,那么使用 cas 将 Mark Word 的值恢复给对象 1. 成功则解锁成功 2. 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
3)锁膨胀 如果在尝试加轻量级锁的过程中,cas 操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程, + 即为对象申请Monitor锁,让Object指向重量级锁地址 + 然后自己进入Monitor 的EntryList 变成BLOCKED状态
当 Thread-0 退出 synchronized 同步块时,使用 cas 将 Mark Word 的值恢复给对象头,对象的对象头指向 Monitor,那么会进入重量级锁的解锁过程,即按照 Monitor 的地址找到 Monitor 对象,将 Owner 设置为 null ,唤醒 EntryList 中的 Thread-1 线程
4)自旋优化 重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁
自旋重试成功的情况
自旋重试失败的情况,自旋了一定次数还是没有等到持锁的线程释放锁
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能
5)偏向锁 在轻量级的锁中,我们可以发现,如果同一个线程对同一个对象进行重入锁时,也需要执行 CAS 操作,这是有点耗时滴,那么 java6 开始引入了偏向锁的东东,只有第一次使用 CAS 时将对象的 Mark Word 头设置为偏向线程 ID,之后这个入锁线程再进行重入锁时,发现线程 ID 是自己的,那么就不用再进行CAS了。分析代码,比较轻量级锁与偏向锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final Object obj = new Object ();public static void m1 () { synchronized (obj) { m2(); } } public static void m2 () { synchronized (obj) { m3(); } } public static void m3 () { synchronized (obj) { } } 123456789101112131415161718
分析如图:
偏向状态 对象头格式如下: 一个对象的创建过程
如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的 Thread,epoch,age 都是 0 ,在加锁的时候进行设置这些的值.
偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟: -XX:BiasedLockingStartupDelay=0 来禁用延迟
注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
撤销偏向 以下几种情况会使对象的偏向锁失效
调用对象的 hashCode 方法
多个线程使用该对象
调用了 wait/notify 方法(调用wait方法会导致锁膨胀而使用重量级锁)
6)批量重偏向
如果对象虽然被多个线程访问,但是线程间不存在竞争,这时偏向 t1 的对象仍有机会重新偏向 t2
当撤销超过20次后(超过阈值),JVM 会觉得是不是偏向错了,这时会在给对象加锁时,重新偏向至加锁线程。
7)批量撤销 当撤销偏向锁的阈值超过 40 以后,就会将整个类的对象都改为不可偏向的
小结 从JDK1.6开始,synchronized锁的实现发生了很大的变化;JVM引入了相应的优化手段来提升synchronized锁的性能,这种提升涉及到偏向锁,轻量级锁以及重量级锁,从而减少锁的竞争带来的用户态与内核态之间的切换;这种锁的优化实际上是通过java对象头中的一些标志位去实现的;对于锁的访问与改变,实际上都是与java对象头息息相关。
对象实例在堆中会被划分为三个部分:对象头,实例数据与对其填充。对象头也是由三块内容来构成:
Mark Word
指向类的指针
数组长度
其中Mark Word(它记录了对象,锁及垃圾回收的相关信息,在64位的JVM中,其长度也是 64bit 的)的位信息包括如下组成部分:
无锁标记(hashcode、分代年龄、偏向锁标志)
偏向锁标记 (偏向线程 id)
轻量级锁标记 (锁记录)
重量级锁标记 (Monitor)
GC标记
对于 synchronized 锁来说,锁的升级主要是通过 Mark Word 中的锁标记位与是否是偏向锁标记为来达成的;synchronized 关键字所对象的锁都是先从偏向锁开始,随着锁竞争的不断升级,逐步演化至轻量级锁,最后变成了重量级锁。
偏向锁:针对一个线程来说的,主要作用是优化同一个线程多次获取一个锁的情况, 当一个线程执行了一个 synchronized 方法的时候,肯定能得到对象的 monitor ,这个方法所在的对象就会在 Mark Work 处设为偏向锁标记,还会有一个字段指向拥有锁的这个线程的线程 ID 。当这个线程再次访问同一个 synchronized 方法的时候,如果按照通常的方法,这个线程还是要尝试获取这个对象的 monitor ,再执行这个 synchronized 方法。但是由于 Mark Word 的存在,当第二个线程再次来访问的时候,就会检查这个对象的 Mark Word 的偏向锁标记,再判断一下这个字段记录的线程 ID 是不是跟第二个线程的 ID 是否相同的。如果相同,就无需再获取 monitor 了,直接进入方法体中。
如果是另一个线程访问这个 synchronized 方法,那么实际情况会如何呢?:偏向锁会被取消掉。
轻量级锁:若第一个线程已经获取到了当前对象的锁,这是第二个线程又开始尝试争抢该对象的锁,由于该对象的锁已经被第一个线程获取到,因此它是偏向锁,而第二个线程再争抢时,会发现该对象头中的 Mark Word 已经是偏向锁,但里面储存的线程 ID 并不是自己(是第一个线程),那么她会进行 CAS(Compare and Swap),从而获取到锁,这里面存在两种情况: 1. 获取到锁成功(一共只有两个线程):那么它会将 Mark Word 中的线程 ID 由第一个线程变成自己(偏向锁标记位保持不表),这样该对象依然会保持偏向锁的状态 2. 获取锁失败(一共不止两个线程):则表示这时可能会有多个线程同时再尝试争抢该对象的锁,那么这是偏向锁就会进行升级,升级为轻量级锁
旋锁,若自旋失败,那么锁就会转化为重量级锁,在这种情况下,无法获取到锁的线程都会进入到 moniter(即内核态),自旋最大的特点是避免了线程从用户态进入到内核态。
6、Wait/Notify 1)原理
锁对象调用wait方法(obj.wait),就会使当前线程进入 WaitSet 中,变为 WAITING 状态。
处于BLOCKED和 WAITING 状态的线程都为阻塞状态,CPU 都不会分给他们时间片。但是有所区别:
BLOCKED 状态的线程是在竞争对象时,发现 Monitor 的 Owner 已经是别的线程了,此时就会进入 EntryList 中,并处于 BLOCKED 状态
WAITING 状态的线程是获得了对象的锁,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了 wait 方法而进入了 WaitSet 中,处于 WAITING 状态
BLOCKED 状态的线程会在锁被释放的时候被唤醒,但是处于 WAITING 状态的线程只有被锁对象调用了 notify 方法(obj.notify/obj.notifyAll),才会被唤醒。
注:只有当对象加锁以后,才能调用 wait 和 notify 方法
2)Wait 与 Sleep 的区别
Sleep 是 Thread 类的静态方法,Wait 是 Object 的方法,Object 又是所有类的父类,所以所有类都有Wait方法。
Sleep 在阻塞的时候不会释放锁,而 Wait 在阻塞的时候会释放锁,它们都会释放 CPU 资源。
Sleep 不需要与 synchronized 一起使用,而 Wait 需要与 synchronized 一起使用(对象被锁以后才能使用)
使用 wait 一般需要搭配 notify 或者 notifyAll 来使用,不然会让线程一直等待。
3)优雅地使用 wait/notify 什么时候适合使用wait
当线程不满足某些条件 ,需要暂停运行时,可以使用 wait 。这样会将对象的锁释放 ,让其他线程能够继续运行。如果此时使用 sleep,会导致所有线程都进入阻塞 ,导致所有线程都没法运行,直到当前线程 sleep 结束后,运行完毕,才能得到执行。使用wait/notify需要注意什么
当有多个线程在运行时,对象调用了 wait 方法,此时这些线程都会进入 WaitSet 中等待。如果这时使用了 notify 方法,可能会造成虚假唤醒 (唤醒的不是满足条件的等待线程),这时就需要使用 notifyAll 方法
1 2 3 4 5 6 7 8 9 10 11 12 synchronized (lock) { while ( lock.wait(); } } synchronized (lock) { lock.notifyAll(); } 1234567891011
4)同步模式之保护性暂停 即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式
因为要等待另一方的结果,因此归类到同步模式
多任务版 GuardedObject 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc 框架的调用中就使用到了这种模式。 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 @Slf4j(topic = "c.Code_23_Test") public class Code_23_Test { public static void main (String[] args) { for (int i = 0 ; i < 3 ; i++) { new People ().start(); } try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } for (Integer id : Mailboxes.getIds()) { new Postman (id, "内容 " + id).start(); } } } @Slf4j(topic = "c.People") class People extends Thread { @Override public void run () { GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.info("收信的为 id: {}" , guardedObject.getId()); Object o = guardedObject.get(5000 ); log.info("收到信的 id: {}, 内容: {}" , guardedObject.getId(), o); } } @Slf4j(topic = "c.Postman") class Postman extends Thread { private int id; private String mail; public Postman (int id, String mail) { this .id = id; this .mail = mail; } @Override public void run () { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.info("送信的 id: {}, 内容: {}" , id, mail); guardedObject.complete(mail); } } class Mailboxes { private static int id = 1 ; private static Map<Integer, GuardedObject> boxes = new Hashtable <>(); public static synchronized int generateId () { return id++; } public static GuardedObject createGuardedObject () { GuardedObject guardedObject = new GuardedObject (generateId()); boxes.put(guardedObject.getId(), guardedObject); return guardedObject; } public static GuardedObject getGuardedObject (int id) { return boxes.remove(id); } public static Set<Integer> getIds () { return boxes.keySet(); } } class GuardedObject { private int id; public GuardedObject (int id) { this .id = id; } public int getId () { return this .id; } private Object response; public Object get (long timeout) { synchronized (this ) { long begin = System.currentTimeMillis(); long passTime = 0 ; while (response == null ) { long waitTime = timeout - passTime; if (waitTime <= 0 ) { break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passTime = System.currentTimeMillis() - begin; } return response; } } public void complete (Object response) { synchronized (this ) { this .response = response; this .notify(); } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
5)异步模式之生产者/消费者 要点
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
小结
当调用 wait 时,首先需要确保调用了 wait 方法的线程已经持有了对象的锁(调用 wait 方法的代码片段需要放在 sychronized 块或者时 sychronized 方法中,这样才可以确保线程在调用wait方法前已经获取到了对象的锁)
当调用 wait 时,该线程就会释放掉这个对象的锁,然后进入等待状态 (wait set)
当线程调用了 wait 后进入到等待状态时,它就可以等待其他线程调用相同对象的 notify 或者 notifyAll 方法使得自己被唤醒
一旦这个线程被其它线程唤醒之后,该线程就会与其它线程以同开始竞争这个对象的锁(公平竞争);只有当该线程获取到对象的锁后,线程才会继续往下执行
当调用对象的 notify 方法时,他会随机唤醒对象等待集合 (wait set) 中的任意一个线程,当某个线程被唤醒后,它就会与其它线程一同竞争对象的锁
当调用对象的 notifyAll 方法时,它会唤醒该对象等待集合 (wait set) 中的所有线程,这些线程被唤醒后,又会开始竞争对象的锁
在某一时刻,只有唯一的一个线程能拥有对象的锁
7、park & unpark 1)基本使用 park & unpark 是 LockSupport 线程通信工具类的静态方法。
1 2 3 4 5 LockSupport.park(); LockSupport.unpark; 1234
2)park unpark 原理 每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond 和 _mutex
打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond 就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
调用 park 就是要看需不需要停下来歇息
如果备用干粮耗尽,那么钻进帐篷歇息
如果备用干粮充足,那么不需停留,继续前进
调用 unpark,就好比令干粮充足
如果这时线程还在帐篷,就唤醒让他继续前进
如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
先调用park再调用upark的过程
先调用 park 1. 当前线程调用 Unsafe.park() 方法 2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond) 3. 线程进入 _cond 条件变量阻塞 4. 设置 _counter = 0
调用 upark 1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1 2. 唤醒 _cond 条件变量中的 Thread_0 3. Thread_0 恢复运行 4. 设置 _counter 为 0
先调用upark再调用park的过程
调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
当前线程调用 Unsafe.park() 方法
检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
设置 _counter 为 0
8、线程状态转换
情况一:NEW –> RUNNABLE 当调用了 t.start() 方法时,由 NEW –> RUNNABLE
情况二: RUNNABLE <–> WAITING
当调用了t 线程用 synchronized(obj) 获取了对象锁后,调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING
调用 obj.notify() , obj.notifyAll() , t.interrupt() 时,会在 WaitSet 等待队列中出现锁竞争,非公平竞争
竞争锁成功,t 线程从 WAITING –> RUNNABLE
竞争锁失败,t 线程从 WAITING –> BLOCKED
情况三:RUNNABLE <–> WAITING
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING
t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE
情况四: RUNNABLE <–> WAITING
当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE
情况五: RUNNABLE <–> TIMED_WAITING t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING
t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
情况六:RUNNABLE <–> TIMED_WAITING
当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING 注意是当前线程在 t 线程对象的监视器上等待
当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE
情况七:RUNNABLE <–> TIMED_WAITING
当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE
情况八:RUNNABLE <–> TIMED_WAITING
当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE
情况九:RUNNABLE <–> BLOCKED
t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED
持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED
情况十: RUNNABLE <–> TERMINATED 当前线程所有代码运行完毕,进入 TERMINATED
9、活跃性 1)定义 线程因为某些原因,导致代码一直无法执行完毕,这种的现象叫做活跃性。
2)死锁 有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁 如:t1 线程获得 A 对象锁,接下来想获取 B 对象的锁 t2 线程获得 B 对象锁,接下来想获取 A 对象的锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void main (String[] args) { final Object A = new Object (); final Object B = new Object (); new Thread (()->{ synchronized (A) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (B) { } } }).start(); new Thread (()->{ synchronized (B) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (A) { } } }).start(); } 1234567891011121314151617181920212223242526272829
发生死锁的必要条件
互斥条件 在一段时间内,一种资源只能被一个进程所使用
请求和保持条件 进程已经拥有了至少一种资源,同时又去申请其他资源。因为其他资源被别的进程所使用,该进程进入阻塞状态,并且不释放自己已有的资源
不可抢占条件 进程对已获得的资源在未使用完成前不能被强占,只能在进程使用完后自己释放
循环等待条件 发生死锁时,必然存在一个进程——资源的循环链。
定位死锁的方法 检测死锁可以使用 jconsole工具;或者使用 jps 定位进程 id,再用 jstack 根据进程 id 定位死锁。哲学家就餐问题 有五位哲学家,围坐在圆桌旁。 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。 如果筷子被身边的人拿着,自己就得等待 。 当每个哲学家即线程持有一根筷子时,他们都在等待另一个线程释放锁,因此造成了死锁。这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况。避免死锁的方法 在线程使用锁对象时,顺序加锁 即可避免死锁
3)活锁 活锁出现在两个线程互相改变对方的结束条件 ,谁也无法结束。
避免活锁的方法 在线程执行时,中途给予不同的间隔时间 即可。
死锁与活锁的区别
死锁是因为线程互相持有对象想要的锁,并且都不释放,最后到时线程阻塞,停止运行的现 象。
活锁是因为线程间修改了对方的结束条件,而导致代码一直在运行,却一直运行不完 的现象。
4)饥饿 某些线程因为优先级太低,导致一直无法获得资源的现象。 在使用顺序加锁时,可能会出现饥饿现象
10、ReentrantLock 和 synchronized 相比具有的的特点
可中断
可以设置超时时间
可以设置为公平锁 (先到先得)
支持多个条件变量( 具有多个 WaitSet)
1 2 3 4 5 6 7 8 9 10 11 private ReentrantLock lock = new ReentrantLock ();lock.lock(); try { }finally { lock.unlock(); } 12345678910
1)可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
2)可打断 如果某个线程处于阻塞状态,可以调用其 interrupt 方法让其停止阻塞,获得锁失败 简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void main (String[] args) { ReentrantLock lock = new ReentrantLock (); Thread t1 = new Thread (() -> { try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); return ; }finally { lock.unlock(); } }); lock.lock(); try { t1.start(); Thread.sleep(1000 ); t1.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } 1234567891011121314151617181920212223242526272829
3)锁超时 使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。 并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位 简而言之就是:获取锁失败了、获取超时了或者被打断了,不再阻塞,直接停止运行。不设置等待时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void main (String[] args) { ReentrantLock lock = new ReentrantLock (); Thread t1 = new Thread (() -> { if (!lock.tryLock()) { System.out.println("获取失败" ); return ; } System.out.println("得到了锁" ); lock.unlock(); }); lock.lock(); try { t1.start(); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } 123456789101112131415161718192021222324
设置等待时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public static void main (String[] args) { ReentrantLock lock = new ReentrantLock (); Thread t1 = new Thread (() -> { try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { System.out.println("获取失败" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); return ; } System.out.println("得到了锁" ); lock.unlock(); }); lock.lock(); try { t1.start(); t1.interrupt(); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } 123456789101112131415161718192021222324252627282930313233
4)公平锁 在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。
1 2 3 ReentrantLock lock = new ReentrantLock (true );12
5)条件变量 synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
await 前需要获得锁
await 执行后,会释放锁,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执
11、同步模式之顺序控制 线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现。
1)Wait/Notify 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class Code_32_Test { public static void main (String[] args) { WaitAndNotify waitAndNotify = new WaitAndNotify (1 , 5 ); new Thread (()->{ waitAndNotify.run("a" , 1 , 2 ); }).start(); new Thread (()->{ waitAndNotify.run("b" , 2 , 3 ); }).start(); new Thread (()->{ waitAndNotify.run("c" , 3 , 1 ); }).start(); } } class WaitAndNotify { public void run (String str, int flag, int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ) { while (flag != this .flag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); this .flag = nextFlag; this .notifyAll(); } } } private int flag; private int loopNumber; public WaitAndNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445
2)park/unpary 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Code_33_Test { public static Thread t1, t2, t3; public static void main (String[] args) { ParkAndUnPark obj = new ParkAndUnPark (5 ); t1 = new Thread (() -> { obj.run("a" , t2); }); t2 = new Thread (() -> { obj.run("b" , t3); }); t3 = new Thread (() -> { obj.run("c" , t1); }); t1.start(); t2.start(); t3.start(); LockSupport.unpark(t1); } } class ParkAndUnPark { public void run (String str, Thread nextThread) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); LockSupport.unpark(nextThread); } } private int loopNumber; public ParkAndUnPark (int loopNumber) { this .loopNumber = loopNumber; } } 123456789101112131415161718192021222324252627282930313233343536373839
3)await/signal 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class Code_34_Test { public static void main (String[] args) { AwaitAndSignal lock = new AwaitAndSignal (5 ); Condition a = lock.newCondition(); Condition b = lock.newCondition(); Condition c = lock.newCondition(); new Thread (() -> { lock.run("a" , a, b); }).start(); new Thread (() -> { lock.run("b" , b, c); }).start(); new Thread (() -> { lock.run("c" , c, a); }).start(); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } lock.lock(); try { a.signal(); }finally { lock.unlock(); } } } class AwaitAndSignal extends ReentrantLock { public void run (String str, Condition current, Condition nextCondition) { for (int i = 0 ; i < loopNumber; i++) { lock(); try { current.await(); System.out.print(str); nextCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } private int loopNumber; public AwaitAndSignal (int loopNumber) { this .loopNumber = loopNumber; } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
结论 本章我们需要重点掌握的是
分析多线程访问共享资源时,哪些代码片段属于临界区
使用 synchronized 互斥解决临界区的线程安全问题
掌握 synchronized 锁对象语法
掌握 synchronzied 加载成员方法和静态方法语法
掌握 wait/notify 同步方法
使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
学会分析变量的线程安全性、掌握常见线程安全类的使用
了解线程活跃性问题:死锁、活锁、饥饿
应用方面
互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
原理方面
monitor、synchronized 、wait/notify 原理
synchronized 进阶原理
park & unpark 原理
模式方面
同步模式之保护性暂停
异步模式之生产者消费者
同步模式之顺序控制
并发编程已完结,章节如下: Java 并发编程上篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理) Java 并发编程中篇 -(JMM、CAS 原理、Volatile 原理) Java 并发编程下篇 -(线程池) Java 并发编程下篇 -(JUC、AQS 源码、ReentrantLock 源码)
四、共享模型之内存 1、Java 内存模型(JMM) JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。 JMM 体现在以下几个方面
原子性 - 保证指令不会受到线程上下文切换的影响
可见性 - 保证指令不会受 cpu 缓存的影响
有序性 - 保证指令不会受 cpu 指令并行优化的影响
2、可见性 1)退不出的循环 首先看一段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static boolean run = true ; public static void main (String[] args) { Thread t1 = new Thread (() -> { while (run) { } }, "t1" ); t1.start(); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t1 Stop" ); run = false ; } 1234567891011121314151617181920
首先 t1 线程运行,然后过一秒,主线程设置 run 的值为 false,想让 t1 线程停止下来,但是 t1 线程并没有停,分析如下图:解决方法
使用 volatile (易变关键字)
它可以用来修饰成员变量和静态成员变量(放在主存中的变量),他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存
1 2 public static volatile boolean run = true ; 1
2)可见性与原子性 上面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对volatile 变量的修改对另一个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况。
注意 synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低。
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?
因为 printIn() 方法使用了 synchronized 同步代码块,可以保证原子性与可见性,它是 PrintStream 类的方法。
3)模式之两阶段终止 使用 volatile 关键字来实现两阶段终止模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Code_02_Test { public static void main (String[] args) throws InterruptedException { Monitor monitor = new Monitor (); monitor.start(); Thread.sleep(3500 ); monitor.stop(); } } class Monitor { Thread monitor; private volatile boolean stop = false ; public void start () { monitor = new Thread () { @Override public void run () { while (true ) { if (stop) { System.out.println("处理后续任务" ); break ; } System.out.println("监控器运行中..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { System.out.println("被打断了" ); } } } }; monitor.start(); } public void stop () { stop = true ; monitor.interrupt(); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
4)模式之 Balking Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回,有点类似单例。
用一个标记来判断该任务是否已经被执行过了
需要避免线程安全问题
加锁的代码块要尽量的小,以保证性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class Code_03_Test { public static void main (String[] args) throws InterruptedException { Monitor monitor = new Monitor (); monitor.start(); monitor.start(); Thread.sleep(3500 ); monitor.stop(); } } class Monitor { Thread monitor; private volatile boolean stop = false ; private boolean starting = false ; public void start () { synchronized (this ) { if (starting) { return ; } starting = true ; } monitor = new Thread () { @Override public void run () { while (true ) { if (stop) { System.out.println("处理后续任务" ); break ; } System.out.println("监控器运行中..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { System.out.println("被打断了" ); } } } }; monitor.start(); } public void stop () { monitor.interrupt(); stop = true ; } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
3、有序性 1)指令重排 首先看一个例子:
1 2 3 4 5 6 7 8 9 int a = 10 ; int b = 20 ; System.out.println( a + b ); int a = 10 ;int b = a - 5 ;12345678
指令重排简单来说可以,在程序结果不受影响的前提下,可以调整指令语句执行顺序。多线程下指令重排会影响正确性。
2)多线程下指令重排问题 首先看一段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int num = 0 ;boolean ready = false ; public void actor1 (I_Result r) { if (ready) { r.r1 = num + num; } else { r.r1 = 1 ; } } public void actor2 (I_Result r) { num = 2 ; ready = true ; } 123456789101112131415161718
在多线程环境下,以上的代码 r1 的值有三种情况: 第一种:线程 2 先执行,然后线程 1 后执行,r1 的结果为 4 第二种:线程 1 先执行,然后线程 2 后执行,r1 的结果为 1 第三种:线程 2 先执行,但是发送了指令重排,num = 2 与 ready = true 这两行代码语序发生装换,
1 2 3 ready = true ; num = 2 ; 12
然后执行 ready = true 后,线程 1 运行了,那么 r1 的结果是为 0。
3)解决方法 volatile 修饰的变量,可以禁用指令重排,禁止的是加 volatile 关键字变量之前的代码 重排序
4、volatile 原理 volatile 的底层实现原理是内存屏障 ,Memory Barrier(Memory Fence) 对 volatile 变量的写指令后会加入写屏障 对 volatile 变量的读指令前会加入读屏障
1)如何保证可见性
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
1 2 3 4 5 6 7 public void actor2 (I_Result r) { num = 2 ; ready = true ; } 123456
而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
1 2 3 4 5 6 7 8 9 10 public void actor1 (I_Result r) { if (ready) { r.r1 = num + num; } else { r.r1 = 1 ; } } 123456789
分析如图:
2)如何保证有序性
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
1 2 3 4 5 6 public void actor2 (I_Result r) { num = 2 ; ready = true ; } 12345
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
1 2 3 4 5 6 7 8 9 10 public void actor1 (I_Result r) { if (ready) { r.r1 = num + num; } else { r.r1 = 1 ; } } 123456789
注意:volatile 不能解决指令交错 写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去。 而有序性的保证也只是保证了本线程内相关代码不被重排序
3)double-checked locking 问题 看如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public final class Singleton { private Singleton () { } private static Singleton INSTANCE = null ; public static Singleton getInstance () { synchronized (Singleton.class) { if (INSTANCE == null ) { INSTANCE = new Singleton (); } } return INSTANCE; } } public final class Singleton { private Singleton () { } private static Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE == null ) { synchronized (Singleton.class) { if (INSTANCE == null ) { INSTANCE = new Singleton (); } } } return INSTANCE; } } 12345678910111213141516171819202122232425262728293031
以上的实现特点是:
懒惰实例化
首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外 但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 0 : getstatic #2 3 : ifnonnull 37 6 : ldc #3 8 : dup9 : astore_010 : monitorenter11 : getstatic #2 14 : ifnonnull 27 17 : new #3 20 : dup21 : invokespecial #4 24 : putstatic #2 27 : aload_028 : monitorexit29 : goto 37 32 : astore_133 : aload_034 : monitorexit35 : aload_136 : athrow37 : getstatic #2 40 : areturn123456789101112131415161718192021222324252627282930
其中
17 表示创建对象,将对象引用入栈 // new Singleton
20 表示复制一份对象引用 // 复制了引用地址
21 表示利用一个对象引用,调用构造方法 // 根据复制的引用地址调用构造方法
24 表示利用一个对象引用,赋值给 static INSTANCE
也许 jvm 会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行: 关键在于 0: getstatic 这行代码在 monitor 控制之外,它就像之前举例中不守规则的人,可以越过 monitor 读取 INSTANCE 变量的值 这时 t1 还未完全将构造方法执行完毕,如果在构造方法中要执行很多初始化操作,那么 t2 拿到的是将是一个未初 始化完毕的单例 对 INSTANCE 使用 volatile 修饰即可,可以禁用指令重排,但要注意在 JDK 5 以上的版本的 volatile 才会真正有效。
4)double-checked locking 解决 加volatile就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final class Singleton { private Singleton () { } private static volatile Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE == null ) { synchronized (Singleton.class) { if (INSTANCE == null ) { INSTANCE = new Singleton (); } } } return INSTANCE; } } 12345678910111213141516
如上面的注释内容所示,读写 volatile 变量操作(即 getstatic 操作和 putstatic 操作)时会加入内存屏障(Memory Barrier(Memory Fence)),保证下面两点:
可见性
写屏障(sfence)保证在该屏障之前的 t1 对共享变量的改动,都同步到主存当中
而读屏障(lfence)保证在该屏障之后 t2 对共享变量的读取,加载的是主存中最新数据
有序性
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
更底层是读写变量时使用 lock 指令来多核 CPU 之间的可见性与有序性
5、happens-before 下面说的变量都是指成员变量或静态成员变量 1)线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static int x; static Object m = new Object (); new Thread (()->{ synchronized (m) { x = 10 ; } },"t1" ).start(); new Thread (()->{ synchronized (m) { System.out.println(x); } },"t2" ).start(); 12345678910111213
2)线程对 volatile 变量的写,对接下来其它线程对该变量的读可见
1 2 3 4 5 6 7 8 volatile static int x;new Thread (()->{ x = 10 ; },"t1" ).start(); new Thread (()->{ System.out.println(x); },"t2" ).start(); 1234567
3)线程 start 前对变量的写,对该线程开始后对该变量的读可见
1 2 3 4 5 6 static int x;x = 10 ; new Thread (()->{ System.out.println(x); },"t2" ).start(); 12345
4)线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束)
1 2 3 4 5 6 7 8 static int x;Thread t1 = new Thread (()->{ x = 10 ; },"t1" ); t1.start(); t1.join(); System.out.println(x); 1234567
5)线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过 t2.interrupted 或 t2.isInterrupted)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static int x; public static void main (String[] args) { Thread t2 = new Thread (()->{ while (true ) { if (Thread.currentThread().isInterrupted()) { System.out.println(x); break ; } } },"t2" ); t2.start(); new Thread (()->{ sleep(1 ); x = 10 ; t2.interrupt(); },"t1" ).start(); while (!t2.isInterrupted()) { Thread.yield (); } System.out.println(x); } 123456789101112131415161718192021
6)对变量默认值(0,false,null)的写,对其它线程对该变量的读可见 7)具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排,有下面的例子
1 2 3 4 5 6 7 8 9 10 11 volatile static int x; static int y; new Thread (() -> { y = 10 ; x = 20 ; },"t1" ).start(); new Thread (() -> { System.out.println(x); },"t2" ).start(); 12345678910
6、练习 1)balking 模式习题 希望 doInit() 方法仅被调用一次,下面的实现是否有问题,为什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 public class TestVolatile { volatile boolean initialized = false ; void init () { if (initialized) { return ; } doInit(); initialized = true ; } private void doInit () { } } 123456789101112
volatile 可以保存线程的可见性,有序性,但是不能保证原子性,doInit 方法没加锁,可能会被调用多次。2)线程安全单例习题 单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试着分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1: 饿汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final class Singleton implements Serializable { private Singleton () {} private static final Singleton INSTANCE = new Singleton (); public static Singleton getInstance () { return INSTANCE; } public Object readResolve () { return INSTANCE; } } 12345678910111213141516
实现2: 饿汉式
1 2 3 4 5 6 7 8 9 10 enum Singleton { INSTANCE; } 123456789
实现3:懒汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 public final class Singleton { private Singleton () { } private static Singleton INSTANCE = null ; public static synchronized Singleton getInstance () { if ( INSTANCE != null ){ return INSTANCE; } INSTANCE = new Singleton (); return INSTANCE; } } 123456789101112
实现4:DCL 懒汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public final class Singleton { private Singleton () { } private static volatile Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE != null ) { return INSTANCE; } synchronized (Singleton.class) { if (INSTANCE != null ) { return INSTANCE; } INSTANCE = new Singleton (); return INSTANCE; } } } 1234567891011121314151617181920
实现5:静态内部类懒汉式
1 2 3 4 5 6 7 8 9 10 11 12 public final class Singleton { private Singleton () { } private static class LazyHolder { static final Singleton INSTANCE = new Singleton (); } public static Singleton getInstance () { return LazyHolder.INSTANCE; } } 1234567891011
结论 本章重点讲解了 JMM 中的
可见性 - 由 JVM 缓存优化引起
有序性 - 由 JVM 指令重排序优化引起
happens-before 规则
原理方面
volatile
模式方面
两阶段终止模式的 volatile 改进
同步模式之 balking
五、共享模型之无锁 管程即 monitor 是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制
1、无锁解决线程安全问题 如下代码,通过 synchronized 解决线程安全问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class Code_04_UnsafeTest { public static void main (String[] args) { Account acount = new AccountUnsafe (10000 ); Account.demo(acount); } } class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public Integer getBalance () { return this .balance; } @Override public void withdraw (Integer amount) { synchronized (this ) { this .balance -= amount; } } } interface Account { Integer getBalance () ; void withdraw (Integer amount) ; static void demo (Account account) { List<Thread> list = new ArrayList <>(); long start = System.nanoTime(); for (int i = 0 ; i < 1000 ; i++) { list.add(new Thread (() -> { account.withdraw(10 ); })); } list.forEach(Thread::start); list.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms" ); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
如上代码加锁会造成线程堵塞,堵塞的时间取决于临界区代码执行的时间,这使用加锁的性能不高,我们可以使用无锁来解决此问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class AccountSafe implements Account { AtomicInteger atomicInteger ; public AccountSafe (Integer balance) { this .atomicInteger = new AtomicInteger (balance); } @Override public Integer getBalance () { return atomicInteger.get(); } @Override public void withdraw (Integer amount) { while (true ){ int pre = getBalance(); int next = pre - amount; if (atomicInteger.compareAndSet(pre,next)){ break ; } } } } 12345678910111213141516171819202122232425
2、CAS 与 volatile 1)cas 前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢? 其中的关键是 compareAndSwap(比较并设置值),它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作 。 如图所示,它的工作流程如下: 当一个线程要去修改 Account 对象中的值时,先获取值 preVal(调用get方法),然后再将其设置为新的值 nextVal(调用 cas 方法)。在调用 cas 方法时,会将 pre 与 Account 中的余额进行比较。
如果两者相等,就说明该值还未被其他线程修改,此时便可以进行修改操作。
如果两者不相等,就不设置值,重新获取值 preVal(调用get方法),然后再将其设置为新的值 nextVal(调用cas方法),直到修改成功为止。
注意:
其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的 。
2)volatile 获取共享变量时,为了保证该变量的可见性 ,需要使用 volatile 修饰。 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。 注意 volatile 仅仅保证了共享变量的可见性,让其它线程能够看到新值,但不能解决指令交错问题(不能保证原子性) CAS 是原子性操作借助 volatile 读取到共享变量的新值来实现【比较并交换】的效果
3)为什么无锁效率高
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
4)CAS 的特点 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响
3、原子整数 java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类: 使用原子的方式更新基本类型
AtomicInteger:整型原子类
AtomicLong:长整型原子类
AtomicBoolean :布尔型原子类
上面三个类提供的方法几乎相同,所以我们将以 AtomicInteger 为例子来介绍。 原子引用 原子数组 字段更新器 原子累加器 下面先讨论原子整数类,以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void main (String[] args) { AtomicInteger i = new AtomicInteger (0 ); System.out.println(i.getAndIncrement()); System.out.println(i.incrementAndGet()); System.out.println(i.decrementAndGet()); System.out.println(i.getAndDecrement()); System.out.println(i.getAndAdd(5 )); System.out.println(i.addAndGet(-5 )); System.out.println(i.getAndUpdate(p -> p - 2 )); System.out.println(i.updateAndGet(p -> p + 2 )); System.out.println(i.getAndAccumulate(10 , (p, x) -> p + x)); System.out.println(i.accumulateAndGet(-10 , (p, x) -> p + x)); } 1234567891011121314151617181920212223242526272829
4、原子引用 为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。 基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
AtomicReference:引用类型原子类
AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起。
1)AtomicReference 先看如下代码的问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe (BigDecimal balance) { this .balance = balance; } @Override public BigDecimal getBalance () { return balance; } @Override public void withdraw (BigDecimal amount) { BigDecimal balance = this .getBalance(); this .balance = balance.subtract(amount); } } 12345678910111213141516
当执行 withdraw 方法时,可能会有线程安全,我们可以加锁解决或者是使用无锁的方式 CAS 来解决,解决方式是用 AtomicReference 原子引用解决。 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class DecimalAccountCas implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountCas (BigDecimal balance) { this .balance = new AtomicReference <>(balance); } @Override public BigDecimal getBalance () { return balance.get(); } @Override public void withdraw (BigDecimal amount) { while (true ) { BigDecimal preVal = balance.get(); BigDecimal nextVal = preVal.subtract(amount); if (balance.compareAndSet(preVal, nextVal)) { break ; } } } } 12345678910111213141516171819202122232425
2)ABA 问题 看如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static AtomicReference<String> ref = new AtomicReference <>("A" ); public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String preVal = ref.get(); other(); TimeUnit.SECONDS.sleep(1 ); log.debug("change A->C {}" , ref.compareAndSet(preVal, "C" )); } private static void other () throws InterruptedException { new Thread (() -> { log.debug("change A->B {}" , ref.compareAndSet(ref.get(), "B" )); }, "t1" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread (() -> { log.debug("change B->A {}" , ref.compareAndSet(ref.get(), "A" )); }, "t2" ).start(); } 12345678910111213141516171819
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。
3)AtomicStampedReference 使用 AtomicStampedReference 加 stamp (版本号或者时间戳)的方式解决 ABA 问题。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static AtomicStampedReference<String> ref = new AtomicStampedReference <>("A" , 0 ); public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String preVal = ref.getReference(); int stamp = ref.getStamp(); log.info("main 拿到的版本号 {}" ,stamp); other(); TimeUnit.SECONDS.sleep(1 ); log.info("修改后的版本号 {}" ,ref.getStamp()); log.info("change A->C:{}" , ref.compareAndSet(preVal, "C" , stamp, stamp + 1 )); } private static void other () throws InterruptedException { new Thread (() -> { int stamp = ref.getStamp(); log.info("{}" ,stamp); log.info("change A->B:{}" , ref.compareAndSet(ref.getReference(), "B" , stamp, stamp + 1 )); }).start(); TimeUnit.SECONDS.sleep(1 ); new Thread (() -> { int stamp = ref.getStamp(); log.info("{}" ,stamp); log.debug("change B->A:{}" , ref.compareAndSet(ref.getReference(), "A" ,stamp,stamp + 1 )); }).start(); } 123456789101112131415161718192021222324252627
4)AtomicMarkableReference AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference 。
5、原子数组 使用原子的方式更新数组里的某个元素
AtomicIntegerArray:整形数组原子类
AtomicLongArray:长整形数组原子类
AtomicReferenceArray :引用类型数组原子类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Code_10_AtomicArrayTest { public static void main (String[] args) throws InterruptedException { demo( () -> new int [10 ], (array) -> array.length, (array, index) -> array[index]++, (array) -> System.out.println(Arrays.toString(array)) ); TimeUnit.SECONDS.sleep(1 ); demo( () -> new AtomicIntegerArray (10 ), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), (array) -> System.out.println(array) ); } private static <T> void demo ( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) { ArrayList<Thread> ts = new ArrayList <>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0 ; i < length; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 10000 ; j++) { putConsumer.accept(array, j % length); } })); } ts.forEach(Thread::start); ts.forEach((thread) -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
使用原子数组可以保证元素的线程安全。
6、字段更新器
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
1 2 Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type 1
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Code_11_AtomicReferenceFieldUpdaterTest { public static AtomicReferenceFieldUpdater ref = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name" ); public static void main (String[] args) throws InterruptedException { Student student = new Student (); new Thread (() -> { System.out.println(ref.compareAndSet(student, null , "list" )); }).start(); System.out.println(ref.compareAndSet(student, null , "张三" )); System.out.println(student); } } class Student { public volatile String name; @Override public String toString () { return "Student{" + "name='" + name + '\'' + '}' ; } } 12345678910111213141516171819202122232425262728
字段更新器就是为了保证类中某个属性线程安全问题。
7、原子累加器 1)AtomicLong Vs LongAdder 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static void main (String[] args) { for (int i = 0 ; i < 5 ; i++) { demo(() -> new AtomicLong (0 ), (ref) -> ref.getAndIncrement()); } for (int i = 0 ; i < 5 ; i++) { demo(() -> new LongAdder (), (ref) -> ref.increment()); } } private static <T> void demo (Supplier<T> supplier, Consumer<T> consumer) { ArrayList<Thread> list = new ArrayList <>(); T adder = supplier.get(); for (int i = 0 ; i < 4 ; i++) { list.add(new Thread (() -> { for (int j = 0 ; j < 500000 ; j++) { consumer.accept(adder); } })); } long start = System.nanoTime(); list.forEach(t -> t.start()); list.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start)/1000_000 ); } 12345678910111213141516171819202122232425262728293031323334
执行代码后,发现使用 LongAdder 比 AtomicLong 快2,3倍,使用 LongAdder 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
8、LongAdder 原理 LongAdder 类有几个关键域 public class LongAdder extends Striped64 implements Serializable {} 下面的变量属于 Striped64 被 LongAdder 继承。
1 2 3 4 5 6 7 transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy; 123456
1)使用 cas 实现一个自旋锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class Code_13_LockCas { public AtomicInteger state = new AtomicInteger (0 ); public void lock () { while (true ) { if (state.compareAndSet(0 , 1 )) { break ; } } } public void unlock () { log.debug("unlock..." ); state.set(0 ); } public static void main (String[] args) { Code_13_LockCas lock = new Code_13_LockCas (); new Thread (() -> { log.info("begin..." ); lock.lock(); try { log.info("上锁成功" ); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t1" ).start(); new Thread (() -> { log.info("begin..." ); lock.lock(); try { log.info("上锁成功" ); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t2" ).start(); } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
2)原理之伪共享 其中 Cell 即为累加单元
1 2 3 4 5 6 7 8 9 10 11 12 @sun .misc.Contendedstatic final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long prev, long next) { return UNSAFE.compareAndSwapLong(this , valueOffset, prev, next); } } 1234567891011
下面讨论 @sun.misc.Contended 注解的重要意义 得从缓存说起,缓存与内存的速度比较 因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离 cpu 越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。 因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效,@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
3)add 方法分析 LongAdder 进行累加操作是调用 increment 方法,它又调用 add 方法。
1 2 3 4 public void increment () { add(1L ); } 123
第一步:add 方法分析,流程图如下 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if ( as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)) ) { longAccumulate(x, null , uncontended); } } } 1234567891011121314151617181920212223242526
第二步:longAccumulate 方法分析,流程图如下: 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
4)sum 方法分析 获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果。
1 2 3 4 5 6 7 8 9 10 11 12 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; } 1234567891011
5、Unsafe 1)Unsafe 对象的获取 Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport 的 park 方法,cas 相关的方法底层都是通过Unsafe类来实现的。
1 2 3 4 5 6 7 8 public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe)theUnsafe.get(null ); } 1234567
2)Unsafe 模拟实现 cas 操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class Code_14_UnsafeTest { public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe)theUnsafe.get(null ); long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id" )); long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name" )); Teacher teacher = new Teacher (); unsafe.compareAndSwapLong(teacher, idOffset, 0 , 100 ); unsafe.compareAndSwapObject(teacher, nameOffset, null , "lisi" ); System.out.println(teacher); } } @Data class Teacher { private volatile int id; private volatile String name; } 12345678910111213141516171819202122232425262728293031
3)Unsafe 模拟实现原子整数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class Code_15_UnsafeAccessor { public static void main (String[] args) { Account.demo(new MyAtomicInteger (10000 )); } } class MyAtomicInteger implements Account { private volatile Integer value; private static final Unsafe UNSAFE = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = UNSAFE.objectFieldOffset (AtomicInteger.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } public MyAtomicInteger (Integer value) { this .value = value; } public Integer get () { return value; } public void decrement (Integer amount) { while (true ) { Integer preVal = this .value; Integer nextVal = preVal - amount; if (UNSAFE.compareAndSwapObject(this , valueOffset, preVal, nextVal)) { break ; } } } @Override public Integer getBalance () { return get(); } @Override public void withdraw (Integer amount) { decrement(amount); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
结论 本章重点讲解
CAS 与 volatile
juc 包下 API 1. 原子整数 2. 原子引用 3. 原子数组 4. 字段更新器 5. 原子累加器
Unsafe
原理方面 6. LongAdder 源码 7. 伪共享
六、共享模型之不可变 1、日期转换的问题 问题提出,下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的,有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果。
1 2 3 4 5 6 7 8 9 10 11 SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd" ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { log.debug("{}" , sdf.parse("1951-04-21" )); } catch (Exception e) { log.error("{}" , e); } }).start(); } 12345678910
思路 - 不可变对象 如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类 DateTimeFormatter
1 2 3 4 5 6 7 8 DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd" ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { LocalDate date = dtf.parse("2018-10-01" , LocalDate::from); log.debug("{}" , date); }).start(); } 1234567
2、不可变设计 String类中不可变的体现
1 2 3 4 5 6 7 8 9 public final class String implements java .io.Serializable, Comparable<String>, CharSequence { private final char value[]; private int hash; } 12345678
1)final 的使用 发现该类、类中所有属性都是 final 的
属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
2)保护性拷贝 但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是 如何实现的,就以 substring 为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public String substring (int beginIndex, int endIndex) { if (beginIndex < 0 ) { throw new StringIndexOutOfBoundsException (beginIndex); } if (endIndex > value.length) { throw new StringIndexOutOfBoundsException (endIndex); } int subLen = endIndex - beginIndex; if (subLen < 0 ) { throw new StringIndexOutOfBoundsException (subLen); } return ((beginIndex == 0 ) && (endIndex == value.length)) ? this : new String (value, beginIndex, subLen); } 123456789101112131415
发现其内部是调用 String 的构造方法创建了一个新字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public String (char value[], int offset, int count) { if (offset < 0 ) { throw new StringIndexOutOfBoundsException (offset); } if (count <= 0 ) { if (count < 0 ) { throw new StringIndexOutOfBoundsException (count); } if (offset <= value.length) { this .value = "" .value; return ; } } if (offset > value.length - count) { throw new StringIndexOutOfBoundsException (offset + count); } this .value = Arrays.copyOfRange(value, offset, offset+count); } 1234567891011121314151617181920
构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】
3、模式之享元 1)简介 简介定义英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时,归类为:Structual patterns
2)体现 包装类 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法。 例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
1 2 3 4 5 6 7 8 public static Long valueOf (long l) { final int offset = 128 ; if (l >= -128 && l <= 127 ) { return LongCache.cache[(int )l + offset]; } return new Long (l); } 1234567
Byte, Short, Long 缓存的范围都是 -128127 Character 缓存的范围是 0127 Integer 的默认范围是 -128~127,最小值不能变,但最大值可以通过调整虚拟机参数 “-Djava.lang.Integer.IntegerCache.high “来改变 Boolean 缓存了 TRUE 和 FALSEString 池 参考如下文章:JDK1.8关于运行时常量池, 字符串常量池的要点 BigDecimal、BigInteger
3)DIY 实现简单的数据库连接池 例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。 代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public class Code_17_DatabaseConnectionPoolTest { public static void main (String[] args) { Pool pool = new Pool (2 ); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { Connection connection = pool.borrow(); try { Thread.sleep(new Random ().nextInt(1000 )); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(connection); }).start(); } } } @Slf4j(topic = "c.Pool") class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray status; public Pool (int poolSize) { this .poolSize = poolSize; status = new AtomicIntegerArray (new int [poolSize]); connections = new Connection [poolSize]; for (int i = 0 ; i < poolSize; i++) { connections[i] = new MockConnection ("连接" + (i + 1 )); } } public Connection borrow () { while (true ) { for (int i = 0 ; i < poolSize; i++) { if (0 == status.get(i)) { if (status.compareAndSet(i,0 , 1 )) { log.info("获取连接:{}" , connections[i]); return connections[i]; } } } synchronized (this ) { try { log.info("wait ..." ); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void free (Connection connection) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == connection) { status.set(i, 0 ); log.info("释放连接:{}" , connections[i]); synchronized (this ) { notifyAll(); } } } } } class MockConnection implements Connection { private String name; public MockConnection (String name) { this .name = name; } @Override public String toString () { return "MockConnection{" + "name='" + name + '\'' + '}' ; } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
以上实现没有考虑:
连接的动态增长与收缩
连接保活(可用性检测)
等待超时处理
分布式 hash
对于关系型数据库,有比较成熟的连接池的实现,例如 c3p0、druid 等 对于更通用的对象池,可以考虑用 apache commons pool,例如 redis 连接池可以参考 jedis 中关于连接池的实现。
4、final的原理 1)设置 final 变量的原理 理解了 volatile 原理,再对比 final 的实现就比较简单了
1 2 3 4 public class TestFinal { final int a = 20 ; } 123
字节码
1 2 3 4 5 6 7 8 0 : aload_01 : invokespecial #1 4 : aload_05 : bipush 20 7 : putfield #2 <-- 写屏障 10 : return 1234567
final 变量的赋值操作都必须在定义时或者构造器中进行初始化赋值,并发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况。
2)获取 final 变量的原理 需要从字节码层面去理解,可以参考如下文章:深入理解final关键字
结论
不可变类使用
不可变类设计
原理方面:final
模式方面
并发编程已完结,章节如下: Java 并发编程上篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理) Java 并发编程中篇 -(JMM、CAS 原理、Volatile 原理) Java 并发编程下篇 -(线程池) Java 并发编程下篇 -(JUC、AQS 源码、ReentrantLock 源码)
七、共享模型之工具 1、线程池 池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。 线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。 这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
2、自定义线程池 上图就是一个线程池的实现,先初始化线程池、阻塞队列大小,然后开几个线程通过线程池对象调用方法执行任务,线程池中的线程会执行任务,如果任务过多,会添加到阻塞队列中,执行完任务再从阻塞队列中取值继续执行。当执行的线程数大于线程池和阻塞队列的大小,我们可以定义拒绝策略,类似 jdk 线程池那样。代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 @Slf4j(topic = "c.Code_01_TestPool") public class Code_01_ThreadPoolTest { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (1 , 1000 , TimeUnit.MILLISECONDS, 1 , (queue, task) -> { task.run(); }); for (int i = 0 ; i < 4 ; i++) { int j = i; threadPool.executor(() ->{ try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("{}" , j); }); } } } @FunctionalInterface interface RejectPolicy <T> { void reject (BlockingQueue<T> queue, T task) ; } @Slf4j(topic = "c.ThreadPool") class ThreadPool { private Set<Worker> works = new HashSet <Worker>(); private BlockingQueue<Runnable> taskQueue; private int coreSize; private long timeout; private TimeUnit unit; private RejectPolicy<Runnable> rejectPolicy; public ThreadPool (int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .unit = unit; taskQueue = new BlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } public void executor (Runnable task) { synchronized (works) { if (works.size() < coreSize) { Worker worker = new Worker (task); log.info("新增 worker {} ,任务 {}" , worker, task); works.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout, unit)) != null ) { try { log.info("正在执行 {}" , task); task.run(); }catch (Exception e) { } finally { task = null ; } } synchronized (works) { log.info("worker 被移除 {}" , this ); works.remove(this ); } } } } @Slf4j(topic = "c.BlockingQueue") class BlockingQueue <T> { private int capacity; private Deque<T> queue; private ReentrantLock lock; private Condition fullWaitSet; private Condition emptyWaitSet; public BlockingQueue (int capacity) { queue = new ArrayDeque <>(capacity); lock = new ReentrantLock (); fullWaitSet = lock.newCondition(); emptyWaitSet = lock.newCondition(); this .capacity = capacity; } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } public T take () { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } public void put (T task) { lock.lock(); try { while (queue.size() == capacity) { try { log.info("等待加入任务队列 {}" , task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); }finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.size() == capacity) { try { if (nanos <= 0 ) { return false ; } log.info("等待加入任务队列 {}" , task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); return true ; }finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity) { rejectPolicy.reject(this , task); } else { log.info("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } public int getSize () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
3、ThreadPoolExecutor 1)线程池状态 ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor 类中的线程状态变量如下:
1 2 3 4 5 6 7 8 9 private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; 12345678
状态名称
高3位的值
描述
RUNNING
111
接收新任务,同时处理任务队列中的任务
SHUTDOWN
000
不接受新任务,但是处理任务队列中的任务
STOP
001
中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING
010
任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED
011
终结状态
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
1 2 3 4 5 6 7 8 9 10 11 12 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;1234567891011
获取线程池状态、线程数量以及合并两个值的操作
1 2 3 4 5 6 7 8 9 10 11 12 private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }1234567891011
线程池的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... } private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();123456789101112131415
2)构造方法 首先我们看一下 ThreadPoolExecutor 类参数最多、最全的有参构造方法。
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 1234567
构造参数解释:
corePoolSize:核心线程数
maximumPoolSize:最大线程数
maximumPoolSize - corePoolSize = 救急线程数
keepAliveTime:救急线程空闲时的最大生存时间
unit:时间单位
workQueue:阻塞队列(存放任务)
有界阻塞队列 ArrayBlockingQueue
无界阻塞队列 LinkedBlockingQueue
最多只有一个同步元素的队列 SynchronousQueue
优先队列 PriorityBlockingQueue
threadFactory:线程工厂(给线程取名字)
handler:拒绝策略
工作方式:
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排 队,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急。
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前 4 种实现,其它著名框架也提供了实现
ThreadPoolExecutor.AbortPolicy 让调用者抛出RejectedExecutionException 异常,这是默认策略
ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
ThreadPoolExecutor.DiscardPolicy 放弃本次任务
ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
Netty 的实现,是创建一个新线程来执行任务
ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
jdk 线程池的拒绝策略结构图如下: 根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池
3)newFixedThreadPool 这个是 Executors 类提供的静态的工厂方法来创建线程池!Executors 是 Executor 框架的工具类,newFixedThreadPool 创建的是固定大小的线程池。实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ExecutorService executorService = Executors.newFixedThreadPool(2 , new ThreadFactory () { private AtomicInteger atomicInteger = new AtomicInteger (1 ); @Override public Thread newThread (Runnable r) { return new Thread (r, "my_thread_" + atomicInteger.getAndIncrement()); } }); executorService.execute(() -> { log.info("1" ); }); executorService.execute(() -> { log.info("2" ); }); executorService.execute(() -> { log.info("3" ); }); 123456789101112131415161718
然后我再看看 Executors 类 使用 newFixedThreadPool 如何创建线程的,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 12345678910111213
通过源码可以看到 new ThreadPoolExecutor(xxx) 方法其实是是调用了之前说的完整参数的构造方法,创建的是固定的线程数,使用了默认的线程工厂和拒绝策略。特点:
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的(LinkedBlockingQueue),可以放任意数量的任务
适用于任务量已知,相对耗时的任务
4)newCachedThreadPool 1 2 3 4 5 6 7 ExecutorService executorService = Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } 123456
特点
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
全部都是救急线程(60s 后没有任务就回收)
救急线程可以无限创建
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交 货)SynchronousQueue
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。
适合任务数比较密集,但每个任务执行时间较短的情况
5)newSingleThreadExecutor 1 2 3 4 5 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 ,0L , TimeUnit.MILLISECONDS,new LinkedBlockingQueue <Runnable>())); } 1234
使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
newSingleThreadExecutor 和 newFixedThreadPool 区别:
和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
Executors.newSingleThreadExecutor() 线程个数始终为 1 ,不能修改 FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法
和Executors.newFixedThreadPool(1) 初始时为1时的区别:Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
注意,Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
说白了就是:使用有界队列,控制线程创建数量。 除了避免 OOM 的原因之外,不推荐使用 Executors提供的两种快捷的线程池的原因还有:
实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
我们应该显示地给我们的线程池命名,这样有助于我们定位问题。
6)提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void execute (Runnable command) ;<T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 123456789101112131415161718
7)关闭线程池 shutdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } 12345678910111213141516171819202122
shutdownNow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } 1234567891011121314151617181920212223242526
其它方法
1 2 3 4 5 6 7 boolean isShutdown () ;boolean isTerminated () ;boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException;123456
8)任务调度线程池 在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。 使用 ScheduledExecutorService 改写:
整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务。
ScheduledExecutorService 中 scheduleAtFixedRate 方法的使用,是 一段时间 的 期间。
ScheduledExecutorService 中 scheduleWithFixedDelay 方法的使用,是 一段时间 的 间隔。
9)正确处理执行任务异常 可以发现,如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。 方法1:主动捉异常
1 2 3 4 5 6 7 8 9 10 ExecutorService pool = Executors.newFixedThreadPool(1 ); pool.submit(() -> { try { log.debug("task1" ); int i = 1 / 0 ; } catch (Exception e) { log.error("error:" , e); } }); 123456789
方法2:使用 Future,错误信息都被封装进submit方法的返回方法中
1 2 3 4 5 6 7 8 ExecutorService pool = Executors.newFixedThreadPool(1 ); Future<Boolean> f = pool.submit(() -> { log.debug("task1" ); int i = 1 / 0 ; return true ; }); log.debug("result:{}" , f.get()); 1234567
10)Tomcat 线程池
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
Acceptor 只负责【接收新的 socket 连接】
Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同,如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常。 源码 tomcat-7.0.42
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException (x); } } else { submittedCount.decrementAndGet(); throw rx; } } } public boolean force (Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException ( "Executor not running, can't force a command into the queue" ); return super .offer(o,timeout,unit); is rejected } 123456789101112131415161718192021222324252627282930313233
Connector 配置如下: Executor 线程池配置如下: 可以看到该线程池实现的是一个无界的队列,所以说是不是执行任务的线程数大于了核心线程数,都会添加到阻塞队列中,那么救急线程是不是就不会用到呢,其实不是,分析如下图: 如图所示,当添加新的任务时,如果提交的任务大于核心线程数小于最大线程数就创建救急线程,否则就加入任务队列中。
异步模式之工作线程 定义: 让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。例如: 海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message) 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率 例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。饥饿: 固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作 1. 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待 2. 后厨做菜:没啥说的,做就是了
比如工人 A 处理了点餐任务,接下来它要等着 工人 B 把菜做好,然后上菜,他俩也配合的蛮好 但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,这时没人做饭了,这就是饥饿。
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池。实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 @Slf4j(topic = "c.Code_07_StarvationTest") public class Code_07_StarvationTest { public static List<String> list = new ArrayList <>(Arrays.asList("宫保鸡丁" , "青椒肉丝" , "千张肉丝" )); public static Random random = new Random (); public static String cooking () { return list.get(random.nextInt(list.size())); } public static void main (String[] args) { ExecutorService cookPool = Executors.newFixedThreadPool(1 ); ExecutorService waiterPool = Executors.newFixedThreadPool(1 ); waiterPool.execute(() -> { log.info("处理点餐" ); Future<String> f = cookPool.submit(() -> { log.info("做菜" ); return cooking(); }); try { log.info("上菜 {} " , f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(() -> { log.info("处理点餐" ); Future<String> f2 = cookPool.submit(() -> { log.info("做菜" ); return cooking(); }); try { log.info("上菜 {} " , f2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } public static void test1 ( ExecutorService executorService) { executorService.execute(() -> { log.info("处理点餐" ); Future<String> f = executorService.submit(() -> { log.info("做菜" ); return cooking(); }); try { log.info("上菜 {} " , f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executorService.execute(() -> { log.info("处理点餐" ); Future<String> f2 = executorService.submit(() -> { log.info("做菜" ); return cooking(); }); try { log.info("上菜 {} " , f2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
创建多大的线程池合适? 过小会导致程序不能充分地利用系统资源、容易导致饥饿,过大会导致更多的线程上下文切换,占用更多内存,
CPU 密集型运算 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算 CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。 1. 经验公式如下: 2. 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间 3. 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 50% = 8 4. 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 10% = 40
应用之定时任务 使用 newScheduledThreadPool 中的 scheduleAtFixedRate 这个方法可以执行定时任务。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) { LocalDateTime now = LocalDateTime.now(); System.out.println(now); LocalDateTime time = now.withHour(18 ).withMinute(0 ).withSecond(0 ).withNano(0 ).with(DayOfWeek.THURSDAY); if (now.compareTo(time) > 0 ) { time = time.plusWeeks(1 ); } long initalDelay = Duration.between(now, time).toMillis(); long period = 1000 * 60 * 60 * 24 * 7 ; ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1 ); executorService.scheduleAtFixedRate(() -> { System.out.println("running" ); }, initalDelay, period, TimeUnit.MILLISECONDS); } 1234567891011121314151617181920
4、Fork/Join 参考如下文章:Fork/Join框架原理解析 Fork/Join框架学习
并发编程已完结,章节如下: Java 并发编程上篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理) Java 并发编程中篇 -(JMM、CAS 原理、Volatile 原理) Java 并发编程下篇 -(线程池) Java 并发编程下篇 -(JUC、AQS 源码、ReentrantLock 源码)
5、J.U.C AQS 原理 1、概述 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
2、特点
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
1 2 3 4 5 6 7 8 9 10 11 12 if (!tryAcquire(arg)) { } if (tryRelease(arg)) { } 1234567891011
3、自定义同步器 下面实现一个不可重入的阻塞式锁:使用 AbstractQueuedSynchronizer 自定义一个同步器来实现自定义锁,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 @Slf4j(topic = "c.Code_11_UnRepeatLockTest") public class Code_11_UnRepeatLockTest { public static void main (String[] args) { MyLock myLock = new MyLock (); new Thread (() -> { myLock.lock(); log.info("lock ... " ); myLock.lock(); try { log.info("starting ... " ); Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("unlock ... " ); myLock.unlock(); } }, "t1" ).start(); } } class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (compareAndSetState(1 , 0 )) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } return false ; } public Condition newCondition () { return new ConditionObject (); } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } } private MySync mySync = new MySync (); @Override public void lock () { mySync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { mySync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return mySync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return mySync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { mySync.release(1 ); } @Override public Condition newCondition () { return mySync.newCondition(); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
ReentrantLock 原理 可以看到 ReentrantLock 提供了两个同步器,实现公平锁和非公平锁,默认是非公平锁!
1、非公平锁实现原理 1)加锁解锁流程 先从构造器开始看,默认为非公平锁实现
1 2 3 4 public ReentrantLock () { sync = new NonfairSync (); } 123
NonfairSync 继承自 AQS 没有竞争时
1 2 3 4 5 6 7 8 9 10 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } 123456789
第一个竞争出现时
1 2 3 4 5 6 7 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 123456
Thread-1 执行了
ock方法中CAS 尝试将 state 由 0 改为 1,结果失败
lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
Node 的创建是懒惰的
其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquire方法的 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)
再从次有多个线程经历上述过程竞争失败,变成这个样子 Thread-0 释放锁,进入 tryRelease 流程,如果成功
设置 exclusiveOwnerThread 为 null
state = 0
如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程:
unparkSuccessor 中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程 如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了 如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) { tail = head; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) { return true ; } if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
解锁源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if ( h != null && h.waitStatus != 0 ) { unparkSuccessor(h); } return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) { compareAndSetWaitStatus(node, ws, 0 ); } Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
2、锁重入原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static final class NonfairSync extends Sync { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } } 1234567891011121314151617181920212223242526272829303132333435363738394041
3、可打断原理 不可打断模式: 在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 static final class NonfairSync extends Sync { private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } static void selfInterrupt () { Thread.currentThread().interrupt(); } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
可打断模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static final class NonfairSync extends Sync { public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException (); } } } finally { if (failed) cancelAcquire(node); } } } 1234567891011121314151617181920212223242526272829303132333435
4、公平锁原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ( (s = h.next) == null || s.thread != Thread.currentThread() ); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
5、条件变量实现原理 每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObjectawait 流程 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部 接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁 unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功 park 阻塞 Thread-0signal 流程 假设 Thread-1 要来唤醒 Thread-0 进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1 Thread-1 释放锁,进入 unlock 流程。 源码分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) { lastWaiter = null ; } first.nextWaiter = null ; } while ( !transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if ( ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) ) { LockSupport.unpark(node.thread); } return true ; } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } private static final int REINTERRUPT = 1 ; private static final int THROW_IE = -1 ; private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await () throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException (); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException (); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil (Date deadline) throws InterruptedException { } public final boolean await (long time, TimeUnit unit) throws InterruptedException { } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
1、ReentrantReadWriteLock 当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!
提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法 。
实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class Code_12_ReadWriteLockTest { public static void main (String[] args) throws InterruptedException { DataContainer dataContainer = new DataContainer (); Thread t1 = new Thread (() -> { dataContainer.read(); }, "t1" ); Thread t2 = new Thread (() -> { dataContainer.write(); }, "t2" ); t1.start(); t2.start(); } } @Slf4j(topic = "c.DataContainer") class DataContainer { private Object object = new Object (); private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); public Object read () { readLock.lock(); log.info("拿到读锁!" ); try { log.info("读取操作 ..." ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } }finally { readLock.unlock(); log.info("释放读锁!" ); } return object; } public void write () { writeLock.lock(); log.info("拿到写锁!" ); try { log.info("写操作 ... " ); }finally { writeLock.unlock(); log.info("释放写锁!" ); } } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
注意事项
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
1 2 3 4 5 6 7 8 9 10 11 12 13 r.lock(); try { w.lock(); try { } finally { w.unlock(); } } finally { r.unlock(); } 123456789101112
重入时降级支持:即持有写锁的情况下去获取读锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } } 1234567891011121314151617181920212223242526272829303132
2、应用之缓存 缓存更新策略: 更新时,是先清缓存还是先更新数据库? 先清除缓存操作如下:
先更新数据库操作如下: 补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询:这种情况的出现几率非常小: 实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 public class Code_13_ReadWriteCacheTest { public static void main (String[] args) { GeneriCacheDao<Object> generiCacheDao = new GeneriCacheDao <>(); Object[] objects = new Object [2 ]; generiCacheDao.queryOne(Object.class,"Test" ,objects); generiCacheDao.queryOne(Object.class,"Test" ,objects); generiCacheDao.queryOne(Object.class,"Test" ,objects); generiCacheDao.queryOne(Object.class,"Test" ,objects); System.out.println(generiCacheDao.map); generiCacheDao.update("Test" ,objects); System.out.println(generiCacheDao.map); } } class GeneriCacheDao <T> extends GenericDao { HashMap<SqlPair, T> map = new HashMap <>(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); GenericDao genericDao = new GenericDao (); @Override public int update (String sql, Object... params) { lock.writeLock().lock(); SqlPair sqlPair = new SqlPair (sql, params); try { int update = genericDao.update(sql, params); map.clear(); return update; } finally { lock.writeLock().unlock(); } } @Override public T queryOne (Class beanClass, String sql, Object... params) { SqlPair key = new SqlPair (sql, params); lock.readLock().lock(); try { T t = map.get(key); if (t != null ){ return t; } } finally { lock.readLock().unlock(); } lock.writeLock().lock(); try { T value = map.get(key); if (value == null ){ value = (T) genericDao.queryOne(beanClass, sql, params); map.put(key, value); } return value; } finally { lock.writeLock().unlock(); } } class SqlPair { private String sql; private Object[] params; public SqlPair (String sql, Object[] params) { this .sql = sql; this .params = params; } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; SqlPair sqlMap = (SqlPair) o; return Objects.equals(sql, sqlMap.sql) && Arrays.equals(params, sqlMap.params); } @Override public int hashCode () { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(params); return result; } } } class GenericDao <T>{ public int update (String sql, Object... params) { return 1 ; } public T queryOne (Class<T> beanClass, String sql, Object... params) { System.out.println("查询数据库中" ); return (T) new Object (); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
3、读写锁原理 图解流程
读写锁用的是同一个 Sync 同步器,因此等待队列、state 等也是同一个下面执行:t1 w.lock,t2 r.lock 情况
1)t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位 2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。 tryAcquireShared 返回值表示
-1 表示失败
0 表示成功,但后继节点不会继续唤醒
正数表示成功,而且数值是还有几个后继节点需要唤醒,我们这里的读写锁返回 1
3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态 4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁 5)如果没有成功,在 doAcquireShared 内 for (;😉 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;😉 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park。
又继续执行 :t3 r.lock,t4 w.lock 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子继续执行 t1 w.unlock 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子 接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,图中的t2从黑色变成了蓝色(注意这里只是恢复运行而已,并没有获取到锁!) 这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一 这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点 事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行. 这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一 这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点 再继续执行t2 r.unlock,t3 r.unlock t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零 t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即 之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;; ) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束 源码分析: 写锁上锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 static final class NonfairSync extends Sync { public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if ( w == 0 || current != getExclusiveOwnerThread() ) { return false ; } if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if ( writerShouldBlock() || !compareAndSetState(c, c + acquires) ) { return false ; } setExclusiveOwnerThread(current); return true ; } final boolean writerShouldBlock () { return false ; } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
写锁释放流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) { setExclusiveOwnerThread(null ); } setState(nextc); return free; } } 1234567891011121314151617181920212223242526272829303132333435
读锁上锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 static final class NonfairSync extends Sync { public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) { doAcquireShared(arg); } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if ( exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current ) { return -1 ; } int r = sharedCount(c); if ( !readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT) ) { return 1 ; } return fullTryAcquireShared(current); } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { return 1 ; } } } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
读锁释放流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 static final class NonfairSync extends Sync { public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int unused) { for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
4、StampedLock 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁
1 2 3 long stamp = lock.readLock();lock.unlockRead(stamp); 12
加解写锁
1 2 3 long stamp = lock.writeLock();lock.unlockWrite(stamp); 12
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 6 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ } 12345
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。 代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class Code_14_StampedLockTest { public static void main (String[] args) throws InterruptedException { StampedLockDataContainer dataContainer = new StampedLockDataContainer (1 ); Thread t1 = new Thread (() -> { try { System.out.println(dataContainer.read(1 )); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ); t1.start(); TimeUnit.MILLISECONDS.sleep(500 ); Thread t2 = new Thread (() -> { dataContainer.write(10 ); }, "t2" ); t2.start(); } } @Slf4j(topic = "c.StampedLockDataContainer") class StampedLockDataContainer { private int data; private StampedLock stampedLock = new StampedLock (); public StampedLockDataContainer (int data) { this .data = data; } public int read (int readTime) throws InterruptedException { long stamp = stampedLock.tryOptimisticRead(); log.info("optimistic read locking ...{}" , stamp); Thread.sleep(readTime * 1000 ); if (stampedLock.validate(stamp)) { log.info("read finish... {}" , stamp); return data; } log.info("update to read lock ..." ); try { stamp = stampedLock.readLock(); log.info("read lock {}" , stamp); Thread.sleep(readTime * 1000 ); log.info("read finish ... {}" , stamp); return data; } finally { stampedLock.unlockRead(stamp); } } public void write (int newData) { long stamp = stampedLock.writeLock(); try { log.info("write lock {}" , stamp); this .data = newData; try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("write finish ... {}" , stamp); log.info("write newData ... {}" , this .data); } finally { stampedLock.unlockWrite(stamp); } } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
注意: StampedLock 不支持条件变量 StampedLock 不支持可重入
Semaphore 1、基本使用 信号量,用来限制能同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.info("start ..." ); Thread.sleep(1000 ); log.info("end ...." ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, "t" + (i + 1 )).start();; } } 1234567891011121314151617181920212223242526
2、图解流程 Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源。 假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞 这时 Thread-4 释放了 permits,状态如下 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
3、源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits ) { super (permits ); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if ( remaining < 0 || compareAndSetState(available, remaining) ) { return remaining; } } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } } public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
CountdownLatch CountDownLatch 允许多线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown方法时,其实使用了 tryReleaseShared 方法以CAS 的操作来减少 state ,直至 state 为 0 就代表所有的线程都调用了countDown方法。当调用 await 方法的时候,如果 state 不为0,就代表仍然有线程没有调用 countDown 方法,那么就把已经调用过 countDown 的线程都放入阻塞队列 Park ,并自旋 CAS 判断 state == 0,直至最后一个线程调用了 countDown ,使得 state == 0,于是阻塞的线程便判断成功,全部往下执行。
用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 @Slf4j(topic = "c.CountDownLatch") public class Code_16_CountDownLatchTest { public static void main (String[] args) throws InterruptedException { method3(); } public static void method1 () throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (3 ); new Thread (() -> { log.info("t1 start ..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t1 end ..." ); countDownLatch.countDown(); }, "t1" ).start(); new Thread (() -> { log.info("t2 start ..." ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t2 end ..." ); countDownLatch.countDown(); }, "t2" ).start(); new Thread (() -> { log.info("t3 start ..." ); try { Thread.sleep(1500 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t3 end ..." ); countDownLatch.countDown(); }, "t3" ).start(); log.info("main wait ..." ); countDownLatch.await(); log.info("main wait end ..." ); } public static void method2 () throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (3 ); ExecutorService executorService = Executors.newFixedThreadPool(4 ); executorService.submit(() -> { log.info("t1 start ..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); log.info("t1 end ...{}" , countDownLatch.getCount()); }); executorService.submit(() -> { log.info("t2 start ..." ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t2 end ...{}" , countDownLatch.getCount()); countDownLatch.countDown(); }); executorService.submit(() -> { log.info("t3 start ..." ); try { Thread.sleep(1500 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t3 end ...{}" , countDownLatch.getCount()); countDownLatch.countDown(); }); executorService.submit(() -> { log.info("main wait ..." ); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("main wait end ..." ); executorService.shutdown(); }); } public static void method3 () throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (10 ); ExecutorService executorService = Executors.newFixedThreadPool(10 ); String[] all = new String [10 ]; Random random = new Random (); for (int i = 0 ; i < 10 ; i++) { int id = i; executorService.submit(() -> { for (int j = 0 ; j <= 100 ; j++) { try { Thread.sleep(random.nextInt(100 )); } catch (InterruptedException e) { e.printStackTrace(); } all[id] = j + "%" ; System.out.print("\r" + Arrays.toString(all)); } countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println(); System.out.println("游戏开始" ); executorService.shutdown(); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
CyclicBarrier CyclicBarri[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟 CountdownLatch 一样,但这个可以重用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2 ); CyclicBarrier cyclicBarrier = new CyclicBarrier (2 , () -> { log.info("task2 finish ..." ); }); for (int i = 0 ; i < 3 ; i++) { executorService.submit(() -> { log.info("task1 begin ..." ); try { Thread.sleep(1000 ); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.submit(() -> { log.info("task2 begin ..." ); try { Thread.sleep(2000 ); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } executorService.shutdown(); } 123456789101112131415161718192021222324252627282930
LinkedBlockingQueue 1)入队操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class LinkedBlockingQueue <E> extends AbstractQueue <E> implements BlockingQueue <E>, java.io.Serializable { static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } } 123456789101112131415
1 2 3 4 5 6 7 private void enqueue (Node<E> node) { last = last.next = node; } 123456
初始化链表 last = head = new Node(null); Dummy 节点用来占位,item 为 null。 当一个节点入队 last = last.next = node; 再来一个节点入队 last = last.next = node;
2)出队操作 1 2 3 4 5 6 7 8 9 10 11 12 private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; } 1234567891011
h = head; first = h.next; h.next = h; head = first;
3)加锁分析 高明之处在于用了两把锁和 dummy 节点
用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
消费者与消费者线程仍然串行
生产者与生产者线程仍然串行
线程安全分析
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
1 2 3 4 5 private final ReentrantLock putLock = new ReentrantLock ();private final ReentrantLock takeLock = new ReentrantLock ();1234
4)put 操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); } 12345678910111213141516171819202122232425262728
5)take 操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull() return x; } 123456789101112131415161718192021222324
注意:由 put 唤醒 put 是为了避免信号不足
6)性能比较 主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
Linked 支持有界,Array 强制有界
Linked 实现是链表,Array 实现是数组
Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
Linked 两把锁,Array 一把锁