Java多线程之CAS中的ABA问题与JUC的常见类
各位三连支持
一. CAS指令与ABA问题
1. 解析CAS
CAS即compare and awap
, 字面意思是比较并交换, 具体说就是将寄存器或者某个内存上的值A
与另一个内存上的值V
进行比较, 如果相同就将B
与需要修改的值V
进行交换, 并返回交换是否成功的结果.
我们假设内存中的原数据V
, 旧的预期值A
, 需要修改的新值B
, 具体涉及下面三个操作.
- 比较
A
与V
是否相等(比较). - 如果比较相等, 将
B
写入V
(交换), 不相等则不执行任何操作. - 返回操作是否成功.
在上述交换过程中, 大多数情况下并不关心B
后续的情况, 更关心的是V
这个变量的情况, 这里的交换, 也可以近似理解成 “赋值”.
伪代码如下:
boolean CAS(A, B, V) { if (A == V) { V = B; return true; } return false; } 1234567
CAS
最特殊的地方在于, 上述过程, 并非是通过一段代码实现的, 而是通过一条CPU
指令完成的, 该指令是具有原子性的, 是线程安全的.
2. 基于CAS实现的原子类
Java标准库中提供了基于CAS所实现的 “原子类”, 这些类的类名以Atomic
开头,针对常用的 int, long 等类型进行了封装, 它们可以基于CAS的方式进行修改, 并且保证线程安全性.
addAndGet(int delta) | i += delta |
decrementAndGet() | –i |
getAndDecrement() | i– |
incrementAndGet() | ++i |
getAndIncrement() | i++ |
这里举个例子, 典型的就是 AtomicInteger
类, 要实现多线程自增同一个变量, 其中的 getAndIncrement
相当于 i++
操作.
import java.util.concurrent.atomic.AtomicInteger; public class TestDemo25 { // 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量 public static void main(String[] args) throws InterruptedException { AtomicInteger count = new AtomicInteger(0); Thread t1 = new Thread(() -> { for (int i = 0; i < 50000; i++) { // 这个方法就相当于 count++ count.getAndIncrement(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 50000; i++) { // 这个方法就相当于 count++ count.getAndIncrement(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count.get()); } } 12345678910111213141516171819202122232425
执行结果:
上面的代码就是基于CAS实现的++
操作, 不会存在线程安全问题, 这里既能够保证线程安全, 又要比 synchronized
高效, synchronized
会涉及到锁的竞争, 两个线程要相互等待, CAS
不涉及到线程阻塞等待.
上面所使用的AtomicInteger类方法getAndIncrement
实现的伪代码如下:
class AtomicInteger { private int value;//保存的值 //自增操作 public int getAndIncrement() { int oldValue = value; while ( CAS(value, oldValue, oldValue+1) != true) { oldValue = value; } return oldValue; } } 1234567891011
对于CAS
指令, 它的执行逻辑就是先判断value
的值与oldValue
的值是否相同, 如果相同就把oldValue+1
的值进写入到value
中(相当于将value
的值加1
), oldValue
可以理解成寄存器里的值, 相当于是先把value
变量内存中的值读至到寄存器当中, 在单线程环境中oldValue
的值与value
的值一定是相同, 但多线程环境下就不一定了, 因为value
的值随时都有可能被其他线程修改, 比如执行完读取value
到寄存器, 线程就发生切换了, 另外一个线程, 也进行修改了value
的值, 然后当这个线程再被调度回来后, 再进行CAS判定 ,就认为value和oldValue不相等了.
接着往下看while
循环, 该循环使用CAS
指令是否成功为判断条件, 如果CAS
成功了则退出循环, 此时value
的值已经加1
, 最终返回oldValue
; 如果CAS指令失败了; 这就说明有新线程提前对当前的value
进行了++
, value
的值发生了改变, 这这时候就需要将新的value
的值赋值给oldValue
, 然后尝试重新进行CAS操作, 这样就能保证有几个线程操作, 那就自增几次, 从而也就保证了线程安全.
结合下图, 当两个线程现指令交错的情况, 理解基于CAS指令实现的多线程自增操作是如何保证线程安全的.
3. 基于CAS实现自旋锁
CAS的应用场景处了实现原子类, 还能够实现自旋锁, 伪代码如下:
//自旋锁对象 public class SpinLock { //记录当前锁对象被哪个线程占用,为null,表示锁对象未被占用 private Thread ownerv = null; public void lock(){ // 通过 CAS 看当前锁是否被某个线程持有. // 如果这个锁已经被别的线程持有,那么就自旋等待. // 如果这个锁没有被别的线程持有,那么就把 owner 设为当前尝试加锁的线程. while(!CAS(this.owner, null, Thread.currentThread())){ } } public void unlock (){ this.owner = null; } } 123456789101112131415
上面CAS与自旋锁的逻辑为了监测当前锁对象是否被线程占用, CAS
监测当前的owner
是否是null
, 如果是null
, 就进行交换, 也就是把当前线程的引用赋值给owner
, 此时循环结束, 退出lock
方法, 加锁就完成了.
如果当前锁, 已经被别的线程占用了, CAS
就会发现, this.owner
不是null
, CAS
就不会产生赋值, 也同时返回false
, 循环继续执行, 并进行下次判定.
解锁的逻辑简单了, 将占用锁对象的线程(ownerv
)置为null
即可.
4. ABA问题
CAS
指令操作的核心的检查value
和oldValue
是否相同, 如果相同, 就视为value
中途没有被修改过, 所以进行下一步交换操作是没问题的, 在大部分情况下都能保证线程安全.
但这里有一种非常极端的情况, 这里说到的相同, value
的值可能是没改过的, 还有可能是value
的值被修改后又被改回到原来的值, 比如把value
的值设为A
的话, CAS
判定value为A
, 此时可能确实value
始终是A
, 也可能是value
本来是A
, 然后被改成了B
, 最后又还原成了A
.
上数说到的极端情况就是CAS中的ABA问题, 在一些极端场景下就会有bug存在, 比如下面的场景.
有一天, 滑稽老铁要到ATM机去取款, 假设当前滑稽的账户余额1000
, 滑稽准备取500
, 当滑稽老铁按下取款的这一瞬间, 机器卡了一下, 滑稽下意识就多按了一下, 如果考虑使用CAS的方式来扣款, 系统扣款的情况可能是下图所示:
正常情况下, 即使按下两次取款按钮最终的结果也是正常的, 但考虑一种极端情况, 如果在第一次CAS成功后的一瞬间, 滑稽老铁的朋友又给给滑稽转账了500
, 导致第一次CAS扣款500
后的余额从500
又变回到了1000
, 然后紧接着第二次CAS操作也会执行成功, 又成功扣款500
, 最终余额变成了500
, 这种结果显然是不合理的, 而正确的程序应该是第二次CAS仍然失败, 最终余额为1000
元.
上述描述场景是极端的情况, 发生的概率是非常非常低的, 一方面, 恰好滑稽这边多按了几次产生多个扣款操作, 另一方面, 又赶巧在这个非常极限的时间内, 有人转账了一样的金额…
不过上述ABA问题在极端可能下造成的bug也是有办法解决的, 可以针对当前问题引入一个版本号, 假设初始版本号是1
, 版本号只能增加不能减少, 每次修改版本号都+1
, 然后进行CAS的时候, 就不是以金额值为基准了, 而是以版本号为基准, 在进行CAS操作之前, 都要对版本号进行验证, 如果版本号与之前加载的版本号不同, 则放弃此次CAS指令操作, 看下图理解, 这样最终的结果就是正确的了.
二. JUC中的常见类
Java中的JUC就是来自java.util.concurrent
包下的一些标准类或者接口, 放的都是并发编程(多线程)相关的组件.
1. Callable接口
常见的创建线程的方式有两种方式, 第一种方法是直接继承Thread
类, 重写run
方法, 第二种方法是实现Runnable
接口, 然后还是要靠Thread
类的构造器, 把Runnable
传进去, 最终调用的就是Runnable
的run
方法。; 和Runnable
类似, 我们还可以通过Callable
接口描述一个任务配合FutureTask
类来创建线程, 和Runnable
不同的是, Callable
接口配合FutureTask
类所创建的线程其中的任务是可以带有返回值的, 而一开始提到的那两种方式任务是不支持带返回值的.
Callable
接口中有一个call
方法(返回值是泛型参数), 就相当于Runnable
接口中的run
方法(无返回值), FutureTask
可用于异步获取执行结果或取消执行任务的场景, 通过传入Runnable
或者Callable
的任务给FutureTask
, 直接调用其run
方法或者放入线程池执行, 之后可以在外部通过FutureTask
的get
方法异步获取执行结果, 如果任务还没有执行完毕, get
方法会阻塞直到任务返回结果.
理解FutureTask
可以为想象去吃麻辣烫, 当餐点好后, 后厨就开始做了, 同时前台会给你一张 “小票”, 这个小票就是FutureTask
, 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没.
使用Thread
类的构造器创建线程的时候, 传入的引用不能是Callable
类型的, 而应该是FutrueTask
类型, 因为构造器中传入的任务类型需要是一个Runnable
类,Callable
与Runnable
是没有直接关系的, 但FutrueTask
类实现了Runnable
类, 所以要想使用Callable
创建线程, 我们就需要先把实现Callable
接口的对象引用传给FutrueTask
类的实例对象, 再将FutrueTask
实例传入线程构造器中.
总结一下就是, 我们可以用Callable用来描述任务, FutureTask类用来管理Callable任务的执行结果.
比如, 我们使用Callable
来计算 1 + 2 + 3 + … + 1000 的值, 并通过返回值的方式获取执行结果.
代码示例:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class TestDemo26 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 使用 Callable 来计算 1 + 2 + 3 + ... + 1000 Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 1000; i++) { sum += i; } return sum; } }; FutureTask<Integer> futureTask = new FutureTask<>(callable); Thread t = new Thread(futureTask); t.start(); //获取执行结果 Integer sum = futureTask.get(); System.out.println(sum); } } 12345678910111213141516171819202122232425
执行结果:
2. ReentrantLock类(可重入锁)
ReentrantLock
是除了synchronized
外标准库给我们提供的另一种可重入锁, 与synchronized
不同的是, ReentrantLock是通过lock
方法加锁,unlock
方法解锁, 相比于synchronized
直接基于代码块的方式来加锁解锁更加传统.
正是由于加锁解锁两个操作是分开的, 所以代码的写法上需要格外注意, 一方面lock
后如果之后的工作代码比较长久容易忘记去unlock
从而造成死锁, 另一方面加锁后解锁前中间的代码万一出了问题(比如直接return
或者出现异常), 都可能导致不能顺利执行unlock
造成死锁.
ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); // working reentrantLock.unlock(); 1234
所以使用ReentrantLock
类时, 一般要搭配finally
使用, 将unlock
放入到finally
保证unlock
一定会执行.
ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); try { // working } finally { reentrantLock.unlock(); } reentrantLock.unlock(); 12345678
🎯ReentrantLock
相较于synchronized
的优势:
ReentrantKLock
提供了公平锁版本的实现, 而synchronized
只实现了非公平锁,ReentrantKLock
默认是非公平锁, 而公平锁版本只需要指定fair参数为true即可..- 对于
synchronized
来说, 提供的加锁操作就是"死等", 也就是说如果锁已经被占用, 只要锁没释放就需要一直等下去,而ReentrantLock
提供了更灵活的等待方式:tryLeock
.
boolean trylock() | 无参数, 能加锁就加, 加不上就放弃 |
boolean trylock(long timeout, TimeUnit unit) | 有参数, 超过指定时间, 加不上锁就放弃 |
ReentrantLock
提供了一个更方便的等待通知机制,synchronized
搭配的是wait, notify
, 当我们notify
的时候是随即唤醒一个wait
状态的线程; 而ReentrantLock
搭配一个Condition
类, 进行唤醒的时候可以唤醒指定线程.
3. Semaphore类(信号量)
Java中信号量(
Semaphore
)是把操作系统原生的信号量封装了一下, 本质就是一个计数器, 描述了 “可用资源的个数”,主要涉及到两个操作
P
操作: 申请一个可用资源, 计数器
-1
.
V
操作: 释放一个可用资源, 计数器
+1
.
如果计数器为0
了, 继续Р操作, 就会出现阻塞等待的情况.
🍂举个例子来理解信号量:
会开车的应该经常会碰到, 停车, 停车场门口有一个灯牌, 会显示停车位还剩余多少个, 每进去一辆车, 显示的停车位数量就-1
, 就相当于进行了一次P
操作, 每出去一辆车, 显示的停车位数量就+1
, 就相当于进行了一次V
操作; 而当停车场的剩余车位为0
时, 显示的停车位数量就为0
了, 此时如果还有车想停, 要么在这里等, 要么就去其他停车场.
🎯Semaphore
类的常用方法:
- 构造方法
public Semaphore(int permits, boolean fair) | 构造可用资源为permits个的信号量对象, 并指定信号量是否是公平性质的 |
- PV方法
Semaphore的PV操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.
public void release() | 释放可用资源 |
代码示例:
- 创建
Semaphore
示例, 初始化为4
, 表示有4
个可用资源. acquire
方法表示申请资源(P
操作),release
方法表示释放资源(V
操作).- 创建
20
个线程, 每个线程都尝试申请资源,sleep1秒
之后, 释放资源. 观察程序的执行效果.
import java.util.concurrent.Semaphore; public class Test { public static void main(String[] args) { Semaphore semaphore = new Semaphore(4); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("申请资源"); semaphore.acquire(); System.out.println("我获取到资源了"); Thread.sleep(1000); System.out.println("我释放资源了"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 20; i++) { Thread t = new Thread(runnable); t.start(); } } } 12345678910111213141516171819202122232425262728
执行结果:
考虑这里一个场景, 假设有一个计数器初始值为1
的信号量, 针对这个信号量的值, 就只有1
和0
两种取值, 因为信号量不能是负的,
- 执行一次
Р
操作,1->0
- 执行一次
V
操作,0->1
如果已经进行一次Р
操作了, 继续进行Р
操作, 就会阻塞等待, 这是不是和锁的功效有点类似呢?
锁就可以视为 “二元信号量”, 可用资源就1
个, 计数器的取值非0
即1,
可以说, 锁是信号量的一种特殊情况, 信号量就把锁推广到了一般情况, 描述了可用资源更多的时候是如何处理的.
所以说, 计数器初始值为1
的信号量就可以当成锁来使用, 这里我们编写一个代码实现两个线程增加同一个变量, 使用Semphore
来控制线程安全.
import java.util.concurrent.Semaphore; public class TestDemo27 { // 编写代码实现两个线程增加同一个变量, 使用 Semphore 来控制线程安全. public static int count = 0; public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(1); Thread t1 = new Thread(() -> { for (int i = 0; i < 50000; i++) { try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 50000; i++) { try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } } 123456789101112131415161718192021222324252627282930313233343536
执行结果:
4. CountDownLatch同步工具类
CountDownLatch
是一个同步工具类, 用来协调多个线程之间的同步, 或者说起到线程之间的通信(而不是用作互斥的作用).
CountDownLatch
能够使一个线程在等待另外一些线程完成各自工作之后, 再继续执行; 使用一个计数器进行实现, 计数器初始值为线程的数量; 当每一个线程完成自己任务后, 计数器的值就会-1
, 当计数器的值为0
时, 表示所有的线程都已经完成一些任务, 然后在CountDownLatch
上等待的线程就可以恢复执行接下来的任务.
想象一个场景进行理解, 假设有一场跑步比赛, 比赛开始时间是明确的, 裁判的发令枪响了比赛就开始了, 但结束时间是不确定的, 只有当所有的选手都冲过了终点线才算结束, 这里的运动员就相当于线程, 而CountDownLatch
类用判断什么时候最后一个远动员冲过终点线, 即这些线程在什么时候可以全部执行结束.
🎯CountDownLatch
类常用方法:
各位三连支持