JUC保姆级教程,轻松面对多线程面试
- 2021年的作者来信:
Hi,你好呀!本文是JUC的一个分享,里面有JUC的入门和面试知识,建议手打理解一遍。
当初面试阿里的时候就考了手写多线程,所幸自己有尝试过总结本文,动手能力还行过了。祝你也能从中有所收获。
多线程
如果你对多线程没什么了解,那么从入门模块开始。 如果你已经入门了多线程(知道基础的线程创建、死锁、synchronized、lock等,并不代表后面不讲,只是后面需要有基础才好深入),那么从juc模块开始。(点此跳转 )
入门模块
基本概念解释
- 程序:静态代码,一串指令的集合
- 进程:资源分配的单位
- 线程:调度和执行的单位,有独立的运行栈和程序计数器
- 并行:多个CPU同时执行多个任务
- 并发:单个CPU同时执行多个任务
多线程的创建 & 使用
方法1:继承Thread类
步骤:
extends Thread
- 重写
run()
方法 - 创建该类的子对象
- 调用子对象的
start()
方法
class PrimeThread extends Thread {
public void run() {
// compute primes larger than minPrime
}
}
//使用方式
PrimeThread p = new PrimeThread();
p.start();
方法2:实现Runnable接口
步骤:
implements Runnable
- 实现
run()
方法 - 创建子对象
- 将子对象传入
new Thread()
构造性的线程 - 调用线程的
start()
方法 注意事项:
一般我们采用实现接口的方式,而不是选择方法一,因为我们可以通过同意对象传造出多个线程,这样可以做到数据域共享,不过并不代表线程安全了!
class PrimeRun implements Runnable {
long minPrime;
PrimeRun(long minPrime) {
this.minPrime = minPrime;
}
public void run() {
// compute primes larger than minPrime
}
}
//使用方式
PrimeRun p = new PrimeRun(143);
new Thread(p).start();
方法3:实现Callable接口
好处:
- 相比实现Runnable,实现该接口方法call()支持泛型返回值
- call()方法可以抛出异常
- 可以借助FutureTask类获取返回结果
普及一下FutureTask
FutureTask类能通过get()方法,获取实现Callable接口类的返回值,该类是Future接口的唯一实现类
步骤:
implements Callable
- 重写call()方法
- 创建子对象
- 构造一个FutureTask类的实例,传入参数为子对象
- 构造一个Thread类的示例并
start()
,传入参数为FutureTask类的示例 - 调用FutureTask类的实例的get(),获取结果
示例:
package com.zzt;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
* 实现Callable接口
*/
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class myCall implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int num=0;
for(int i=1;i<=100;i++){
num+=i;
}
return num;
}
}
public class MyThread {
public static void main(String[] args) {
myCall call=new myCall();
FutureTask<Integer> futureTask=new FutureTask(call);
new Thread(futureTask).start();
try {
Integer num=futureTask.get();
System.out.println(num);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
方法4:使用线程池
好处:
- 提高响应速度(减少创建时间)
- 降低资源消耗(可重用)
- 便于线程管理
ExecutorService
ExecutorService是线程池的接口,实现类为ThreadPoolExecutor,Executors是线程池创建的工具类
步骤:
- 使用Executors工具类创建线程池,返回ThreadPoolExecutor
- 设置线程池所需参数
- 调用ThreadPoolExecutor类执行方法 --> execute、submit等
线程池的常用方法:
Executors工具类(创造线程池)的常用方法:
newCacheThreadPool() //根据需求创造线程池
newFIxedThreadPool() //可重用的固定线程池(常用方法)
newSingleThreadExecutor() //单个线程的线程池
newScheduledThreadPool() //创建一个线程池。在给定安排的延迟后执行
ThreadPoolExecutor(线程池)的常用方法:
executoe() //传入实现Runnable接口的实例
submit() //传入实现Callable or Runnable接口的实例
/*
submit() 一般是传入实现Callable接口的实例
然后用FutureTask类接收返回值
*/
shutdown() //关闭线程池
设置线程池参数!
corePoolSize() //设置大小
maximumPoolSize() //最大线程数
keepAliveTime() //线程无任务时,最多保持多长时间后终止
示例:
package com.zzt;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池的使用示例
**/
class myRunnable implements Runnable{
@Override
public void run(){
System.out.println("执行完成");
}
}
public class MyThreadPool {
public static void main(String[] args) {
// 1. 使用工具类初始化线程池
ThreadPoolExecutor service = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// 2. 实例化实现Runnable接口的对象
myRunnable myRunnable = new myRunnable();
// 3. 执行线程
service.execute(myRunnable);
// 4. 关闭线程池
service.shutdown();
}
}
线程的一些常用方法
Thread.current() //这个方法可以获取当前类的线程,≈this
start() //启动线程
run() //需要重写的方法
setName() //设置线程名字
yield() //将控制权暂时给别的线程
join() //插入下运行并完成,需要先启动start
stop() //强制结束,过时方法,不推荐使用!!
sleep() //休息(ms)
setPriority(int) //设置优先级 1~10
getPriority() //获取优先级
/*
优先级最大 --- Thread.Max_PRIORITY --- 10
优先级最低 --- Thread.MIN_PRIORITY --- 1
常规优先级(默认) --- Thread.NORM_PRIORITY --- 5
并不是优先级越高就越快完成,只是率先完成的概率大一点
*/
重点:线程的同步问题
使用多线程的时候,操纵访问同一个数据域就有可能出现同步问题。简而言之,假设我们对一个数据修改了,然后要输出修改后的数据,可是修改完后的一瞬间又有线程修改了数据,这样我们看到的是经过二次修改的数据,而不是当此修改完的数据,显示会出现错误。
线程同步问题的解决方式
方法1:同步代码块
把需要同步的代码写在synchronized中,括号为同步监视器(俗称锁),同步监视器可以是仍以object对象,不过要注意需要同步的要使用同一个锁,不然无法实现效果
synchronized(this){
//代码块
}
/*
若使用实现接口的方式创建的线程,那么可以用this作为同步监视器
若使用继承Thread类的方式创建线程,使用this会导致锁不一样,所以采用 ---> 类名.class 的方式更好
当然如果自己写了一个类当锁也可以
*/
我们也可以在进入同步代码块前加一些条件判断,这样可以提高效率,不需要一个个线程等着排队,这个在我们同步代码块里面有条件判断的时候适用,看情况而定。
方法2:同步方法
同步方法其实就是把同步监视器省去了,在方法修饰符后加上synchronized,本质同步监视器就是:当前类本身(静态方法的当前类)or this(非静态方法的当前对象)
public synchronized void test(){
//代码
}
注意我们这样保护线程安全,要保证是对一个对象作为锁,才是安全的
方法3:同步锁
这个是JDK1.5+推出的,推荐使用这个,灵活性更强
class X{
//定义锁对象
private final ReentrantLock lock=new ReentrantLock();
//定义需要保证线程安全的方法
public void m(){
//加锁
lock.lock();
try{
//...method body
}
//使用finally块来保证释放锁
finally{
lock.unlock();
}
}
}
包含两个方法:
- lock() --> 上锁 --> 写在try之前
- unlock() --> 解锁 --> 写在finally内
死锁现象
当我们出现两个线程共同争夺一把锁,都互相抓着不放手的时候就会出现死锁现象
死锁现象的代码演示:
package com.zzt;
/**
* 多线程死锁演示
*
* @author ZZT
*
*/
public class MyThread {
public static void main(String[] args) {
Object s1=new Object();
Object s2=new Object();
new Thread() {
@Override
public void run() {
synchronized (s1) {
System.out.println("进入第一线程 -- 获得锁s1");
try {
Thread.sleep(100);
}catch(InterruptedException e) {
e.printStackTrace();
}
synchronized (s2) {
System.out.println("进入第一线程 -- 获得锁s2");
}
}
}
}.start();
new Thread() {
@Override
public void run() {
synchronized (s2) {
System.out.println("进入第二线程 -- 获得锁s2");
try {
Thread.sleep(100);
}catch(InterruptedException e) {
e.printStackTrace();
}
synchronized (s1) {
System.out.println("进入第二线程 --获得锁s1");
}
}
}
}.start();
}
}
/**
增加sleep()是为了提高死锁概率,方便演示
**/
结果就是:
进入第一线程 -- 获得锁s1
进入第二线程 -- 获得锁s2
二者都不能执行完,程序会卡着不动
会发现第一个线程拿了s1执行完需要拿s2,可是第二个线程拿了s2后继续执行需要拿s1。这时候就会产生矛盾,线程1需要在拿到s1后拿到s2才能执行完全部,而s2被线程2抓着不放,所以产生死锁。
避免死锁的建议
平常使用中要减少嵌套同步,减少使用同步资源,优化算法(比如少sleep) --- 死锁的出现是有概率的,但是有可能就要处理
线程的通信
notify() //唤醒一个线程,根据优先级唤醒
notifyAll() //唤醒全部线程
wait() //休息,并释放同步监视器!
这三个方法必须在 同步方法 or 同步代码块中使用,调用对象都是同步监视器(锁)
面试题:
问:synchronized和lock有什么异同?
答:
- 同:二者都能作为解决线程安全问题的方式
- 异:synchronized是执行完方法后自动释放同步监视器,而lock需要手动启动和释放同步监视器
注:后面进阶模块有更详细的解释~
问:sleep()和wait()有什么异同?
答:
- 同:都可以使当前进程进入阻塞态
- 异:
- sleep()在Thread中,wait()在Obejct中;
- 范围不同。wait()必须在同步代码块or同步方法内使用,而sleep()则不需要;
- sleep()不释放锁,而wait()释放锁。
问:写一个饿汉式和懒汉式的单例模式
答:
- 饿汉式(线程安全的)
public class X{
private static X a=new X();
private X(){}
public static X getInstance(){
return a;
}
}
- 懒汉式(线程不安全,所以要加synchronized变成线程安全)
public class X{
private static X a;
private X(){}
private static Object key = new Obejct();
public static X getInstance(){
if(a == null){//提高效率
synchronized (key) {//线程安全模式
if(a==null){
a = new X();
}
}
}
return a;
}
}
JUC进阶模块
先回答几个问题再往下走
线程基础知识
- 进程:资源分配的单位
- 线程:调度和执行的基本单位
一个进程往往可以包含多个线程,至少包含一个!
问:Java默认有几个线程?
答:2 个 ---> mian、GC
1. java真可开线程?
回答:不行!实际上 private native 修饰的方法调用c++开辟的
2. 并发和并行的区别?
-
并发:多线程操作同一个资源
· CPU 一核 ,模拟出来多条线程,快速交替达到效果
-
并行:多个任务同时执行 · CPU 多核 ,多个线程可以同时执行
并发编程的本质:充分利用CPU的资源
3. java线程有几个状态?
六个!
public enum State {
// 新生
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
// 超时等待
TIMED_WAITING,
// 终止
TERMINATED;
}
4. sleep和wait区别?
-
来自的类
sleep ----> Thread
wait -----> Object
-
锁释放问题
sleep ----> 不释放锁
wait -----> 释放锁
-
作用范围
sleep ----> 用于线程中任何地方
wait -----> 只能用于同步代码块、同步方法中
5. synchronized和lock锁的区别
Lock主要的类:ReentrantLock
- synchronized为关键字,lock为类
- synchronized不可判断当前状态,lock可以
- synchronized自动释放锁,lock手动释放锁
- synchronized适合锁少量代码,lock适合大量
- synchronized不可中断,lock可以中断
- synchronized非公平锁(可插队),lock可以修改为公平锁(默认非公平)
6. 生产者消费者问题
一般该问题都分为三步:判断等待 ---> 处理业务 ---> 唤醒线程
synchronized版本
需要注意的地方:注意判断条件不可以用if,再多几个线程会出问题,必须要使用while
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class producer_consumer {
public static void main(String[] args) {
Person person = new Person();
for (int i=0;i<6;i++){
new Thread(()->{
person.produce();
},"producer").start();
new Thread(()->{
person.consume();
},"consumer").start();
}
}
}
class Person{
int num=0;//产品数量
public synchronized void produce(){
while(num!=0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
this.notifyAll();
System.out.println("生产了商品,目前商品存余:"+num);
}
public synchronized void consume(){
while(num==0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
this.notifyAll();
System.out.println("生产了商品,目前商品存余:"+num);
}
}
lock版本
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/***
*
*
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class producer_consumer {
public static void main(String[] args) {
Person person = new Person();
for (int i=0;i<6;i++){
new Thread(()->{
person.produce();
},"producer").start();
new Thread(()->{
person.consume();
},"consumer").start();
}
}
}
class Person{
private int num=0;//产品数量
Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();
public void produce(){
lock.lock();
try{
while(num!=0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
condition.signalAll();
System.out.println("生产了商品,目前商品存余:"+num);
}finally {
lock.unlock();
}
}
public void consume(){
lock.lock();
try{
while(num==0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
condition.signalAll();
System.out.println("生产了商品,目前商品存余:"+num);
}finally {
lock.unlock();
}
}
}
按指定顺序执行线程
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class PrintABC {
public static void main(String[] args) {
Print print = new Print();
for(int i=0;i<6;i++){
new Thread(()->{
print.printA();
},"a").start();
new Thread(()->{
print.printB();
},"b").start();
new Thread(()->{
print.printC();
},"c").start();
}
}
}
class Print{
private int num=1;//区分三者执行条件
Lock lock=new ReentrantLock();
Condition conditionA=lock.newCondition();
Condition conditionB=lock.newCondition();
Condition conditionC=lock.newCondition();
public void printA(){
lock.lock();
try{
while(num!=1){
conditionA.await();
}
System.out.println(Thread.currentThread().getName()+"执行了");
num=2;
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try{
while(num!=2){
conditionB.await();
}
System.out.println(Thread.currentThread().getName()+"执行了");
num=3;
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try{
while(num!=3){
conditionC.await();
}
System.out.println(Thread.currentThread().getName()+"执行了");
num=1;
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
7. 什么是可重入锁?
可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。ReentrantLock和synchronized都是可重入锁 。
也就是说一个线程获取了某个锁,执行过程中亦可以正常获得该锁,不会出现死锁。
import java.util.concurrent.locks.ReentrantLock;
public class suo {
public static void main(String[] args) {
suotry suotry = new suotry();
suotry.say();
}
}
class suotry{
public synchronized void say(){//获得了锁
System.out.println("------------say-----------");
hello();//进入hello
}
public synchronized void hello(){//允许再次获得锁(搭配say看)
System.out.println("==========hello============");
}
}
深入理解锁
8锁现象,实际对应的就是8个问题,学完以后可以更好理解锁,明白锁的对象
synchronized锁对象的区分
当synchronized修饰的方法为static的时候,锁的对象是 class类模板
当sychronized修饰的方法为非静态方法的时候,锁的对象是 实例对象
例子1
下面先打印A还是B?
import java.util.concurrent.TimeUnit;
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class lock8 {
public static void main(String[] args) {
PrintAB printAB = new PrintAB();
new Thread(()->{
printAB.A();
},"a").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
printAB.B();
},"b").start();
}
}
class PrintAB{
public synchronized void A(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打印A");
}
public synchronized void B(){
System.out.println("打印B");
}
}
答案:先打印A 然后 打印B
原因:二者锁是同一个对象printAB(实例),A拿到以后虽然休息但是不释放锁,所以B要等A释放以后才可以执行
例子2
下面先打印A还是B?
import java.util.concurrent.TimeUnit;
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class lock8 {
public static void main(String[] args) {
PrintAB printAB = new PrintAB();
new Thread(()->{
printAB.A();
},"a").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
printAB.B();
},"b").start();
}
}
class PrintAB{
public synchronized static void A(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打印A");
}
public synchronized void B(){
System.out.println("打印B");
}
}
答案:先打印B 然后 打印A
原因:A的锁是PrintAB这个类模板(唯一的!!),B的锁是printAB这个实例对象,在A休息4s的时间内,B仍然可以正常执行,故先打印B然后打印A
集合类不安全
list、set、map都线程不安全,多线程情况下启动多个线程去修改集合类,可能会报错 java.util.ConcurrentModificationException
线程安全的方法
那么就要想办法实现线程安全。
-
使用
Collections.synchronized~
创建List、set、map(注:~的意思是可以接上List、Set、Map等词) -
使用
CopyOnWriteArray~
读写分离创建集合(后面有讲到读写锁)写时复制,CopyOnWrite容器即写时复制的容器,往一个容器中添加元素的时候,不直接往当前容器Object[]添加,而是先将Object[]进行copy,复制出一个新的容器object[] newElements,然后新的容器Object[] newElements里添加原始,添加元素完后,在将原容器的引用指向新的容器 setArray(newElements)。
好处:可以对copyOnWrite容器进行并发读取,而不需要加锁,因为当前容器不需要添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class collection_thread {
public static void main(String[] args) {
//线程安全的方法1
List list = Collections.synchronizedList(new ArrayList<>());
Set set = Collections.synchronizedSet(new HashSet<>());
//线程安全的方法2
List copyOnWriteArrayList=new CopyOnWriteArrayList();
Set copyOnWriteArraySet=new CopyOnWriteArraySet();
//hashmap线程安全的方法
Map<String, String> map = new ConcurrentHashMap<>();
//演示
for (int i = 1; i <=30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring( 0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
在了解ConcurrentHashMap之前还要了解一下什么是CAS操作
CAS操作
CAS:Compare and Swap,即比较再交换。区别于synchronized的悲观锁,CAS可以算是一种乐观锁。
CAS简单理解
CAS是一种无锁算法,CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
假设t1在与t2线程竞争中线程t1能去更新变量的值,而其他线程都失败。(失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次发起尝试)。t1线程去更新变量值改为num+1,然后写到内存中。此时对于t2来说,内存值变为了num+1,与预期值num不一致,就操作失败了。
CAS问题
CAS(比较并交换)是CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。但CAS就没有开销了吗?
- 不!有cache miss的情况。
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就 一直循环!
缺点:
1、 循环会耗时
2、一次性只能保证一个共享变量的原子性
3、ABA问题
ABA问题
过程中数值被改变但是又改回了原来的值,虽然交换成功但是其实原本对象以及被改变了。
ABA的危害
假设一个栈有多个数据,前面两个数据被一个线程取走,第三个数据又和第一个一样,此时原本要修改第一个数据的线程通过cas操作,发现第三个数据和第一个数据相等,然后修改了第三个数据,那么就会造成问题。
package CAS;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.atomic.AtomicInteger;
public class casDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
// ============== 捣乱的线程 ==================
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2, 1));
System.out.println(atomicInteger.get());
// ============== 期望的线程 ==================
System.out.println(atomicInteger.compareAndSet(1, 3));
System.out.println(atomicInteger.get());
}
}
解决方式
加入版本号一起对比,就可以防止ABA问题。
package CAS;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class casDemo {
// 业务中,一般比较的是一个个对象。第二个参数是初始版本号
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
public static void main(String[] args) {
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("版本号 => "+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Lock lock = new ReentrantLock(true);
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println("版本号 => "+atomicStampedReference.getStamp());
System.out.println("捣乱线程执行:"+atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("版本号 => "+atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("期望获取的版本号 => "+stamp);
//休息长时间,让捣乱线程执行
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("期望线程执行:"+atomicStampedReference.compareAndSet(1, 6,
stamp, stamp + 1));
System.out.println("版本号 => "+atomicStampedReference.getStamp());
},"b").start();
}
}
ConcurrentHashMap底层原理 (JDK1.8)
重要成员变量
① table:默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。 ② nextTable:默认为null,扩容时新生成的数组,其大小为原数组的两倍。 ③ sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
- -1 代表table正在初始化
- -N 表示有N-1个线程正在进行扩容操作
其余情况: 1、如果table未初始化,表示table需要初始化的大小。 2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。 Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。
put操作描述
put操作采用 CAS+synchronized 实现并发插入或更新操作:
-
根据key进行两次hash算法得到hash值。
-
判断Node数组是否为空,如果为空进行初始化。
-
根据hash值得出所在的数组的位置,并判断当前数组里有没有链表存在,没有就通过CAS操作将元素加入到当前位置中。
-
如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
-
如果当前数组位置已经存在元素了,就先用synchronized加锁,然后再判断当前位置是链表,还是红黑树,再对比hash值和equls,hash值相同的,如果key相同就覆盖,key不相同就挂在当前链表后面,hash值不同,就挂在新节点上。
-
如果是树的话,则调用putTreeVal方法把这个元素添加到树中去,最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组
扩容描述
当table的元素数量达到容量阈值sizeCtl,需要对table进行扩容:
- 构建一个nextTable,大小为table两倍
- 把table的数据复制到nextTable中。
- 同时检测到某个table链表元素小于6个了的红黑树,就会自动把红黑树又转为链表结构。
如何在扩容时,并发地复制与插入?
- 遍历整个table,当前节点为空,则采用CAS的方式在当前位置放入fwd
- 当前节点已经为fwd(with hash field “MOVED”),则已经有有线程处理完了了,直接跳过 ,这里是控制并发扩容的核心
- 当前节点为链表节点或红黑树,重新计算链表节点的hash值,移动到nextTable相应的位置(构建了一个反序链表和顺序链表,分别放置在i和i+n的位置上)。移动完成后,用Unsafe.putObjectVolatile在tab的原位置赋为为fwd, 表示当前节点已经完成扩容。
JDK1.7&JDK1.8的区别
JDK 1.7:
- 同步机制: 分段锁,每个segment继承ReentrantLock
- 存储结构:数组+链表
- 键值对: HashEntry
- put操作: 多个线程同时竞争获取同一个segment锁,获取成功的线程更新map;失败的线程尝试多次获取锁仍未成功,则挂起线程,等待释放锁
- size的实现:统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的。先采用不加锁的方式,连续计算元素的个数,最多计算3次:如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数
JDK 1.8:
- 同步机制: CAS + synchronized保证并发更新
- 存储结构:数组+链表+红黑树
- 键值对:Node
- put操作:访问相应的bucket时,使用sychronizeded关键字,防止多个线程同时操作同一个bucket,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点;更新了节点数量,还要考虑扩容和链表转红黑树
- size的实现:通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数
思考
为什么JDK1.8做了如此改动?
总结如下思考
-
JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是首节点
-
JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加
-
JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
-
JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,原因如下:
1.因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中 ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优 势就没有了
2.JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的 关键字比使用API更加自然
3.在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶 颈,但是也是一个选择依据
AQS介绍
AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks 包下面。
AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,
诸如 ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基于 AQS 的。
AQS 核心思想:
(1)如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
(2)如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
介绍:CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
AQS状态标识
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
//返回同步状态的当前值
protected final int getState() { return state; }
// 设置同步状态的值
protected final void setState(int newState) { state = newState; }
//使用CAS操作将同步状态值设置为给定值update(保证原子性)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
常用辅助类
这里介绍常用的辅助类,必须熟练掌握,都是基于AQS实现的,如果要深入了解源码可以阅读更多文章,这里主要介绍如何使用。
CountDownLatch
每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
示例代码:
package CommonClassDemo;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" finished");
countDownLatch.countDown(); // 数量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零,然后再向下执行
System.out.println("All Finished");
}
}
如果缺少该类的限制会怎么样?
会提前执行All Finished,与预期效果不符合
注意:CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
CyclicBarrier
CountDownLatch的实现是基于AQS的,而CycliBarrier是基于 ReentrantLock(ReentrantLock也属于AQS同步器)和 Condition 的.
CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用await
方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞,直到数目达到parties
才会执行。
package CommonClassDemo;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 执行7次,即可退出
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("退出成功!");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i; // lambda不能操作到 i,所以加一个final的temp
new Thread(()->{
System.out.println("第"+Thread.currentThread().getName()+"线程执行完毕");
try {
cyclicBarrier.await(); // 等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
CyclicBarrier 和 CountDownLatch 的区别?
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
countDown()方法计数减一,会继续执行自己的任务。 await()方法只进行阻塞,不改变计数 |
await()方法计数加1,如果未达到指定的数字则线程阻塞 所有线程任务结束之后,才会进行后续任务** |
不可重复利用 | 可重复利用 |
Semaphore
设置固定数目给线程进入执行,在 semaphore.acquire() 和 semaphore.release()之间的代码,同一时刻只允许制定个数的线程进入。
package CommonClassDemo;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 进入");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 离开");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}
读写锁
读写锁维护一对关联的锁,一个用于只读,一个用于写入。在同一时间内,允许多个线程同时读取,但是只允许一个线程写入。
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/***
* @author: G_night
* 转载请声明作者
* Reprint please state the author
***/
public class ReadWriteLockDemo {
public static void main(String[] args) {
myRWLock myRWLock = new myRWLock();
for(int i=1;i<=10;i++){
final int t=i;
new Thread(()->{
myRWLock.put(t,t);
},"Write_Thread_"+String.valueOf(t)).start();
}
for(int i=1;i<=10;i++){
final int t=i;
new Thread(()->{
myRWLock.get(t);
},"Read_Thread_"+String.valueOf(t)).start();
}
}
}
class myRWLock{
//初始化读写锁
ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
//存储数据的map
HashMap<Integer,Integer> map=new HashMap<>();
//存入
public void put(int k,int v){
lock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"开始写入");
map.put(k,v);
System.out.println(Thread.currentThread().getName()+"结束写入");
}finally {
lock.writeLock().unlock();
}
}
public void get(int v){
lock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"开始读取");
System.out.println("获取:"+map.get(v));
System.out.println(Thread.currentThread().getName()+"结束读取");
}finally {
lock.readLock().unlock();
}
}
}
运行结果:写入过程的操作不会有其他线程插队,而读取允许插队
阻塞队列&同步队列
**阻塞队列 **:当写满的时候继续写入会进入阻塞状态,当取东西时为空的时候也可以进入阻塞状态等待写入的东西。
方式 | 抛出异常 | 有返回值 | 阻塞 等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer |
移除 | remove | poll | take | poll |
获取元素 | element | peek | \ | \ |
阻塞等待和超时等待的区别:阻塞等待会一直等,而超时等待可以自己设置等待的时间,若没有就不再等待。
用法: blockingQueue.poll ( 2 , TimeUnit.SECONDS ) ;
第一个参数是数值,第二个是单位。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
blockingQueue.offer("d",2, TimeUnit.SECONDS); // 等待超过2秒就退出,
// 这时候如果有其他线程取出数值,那么阻塞就会提前结束
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS); // 等待超过2秒就退出
}
}
同步队列:类似阻塞队列,不同点在于没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
主要的方法:put、take
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue=new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println("put : a");
queue.put("a");
System.out.println("put : b");
queue.put("b");
System.out.println("put : c");
queue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("take : "+queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println("take : "+queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println("take : "+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"2").start();
}
}
运行结果:每次put后都在等待take以后继续put。
put : a
take : a
put : b
take : b
put : c
take : c
线程池
好处:
- 提高响应速度(减少创建时间)
- 降低资源消耗(可重用)
- 便于线程管理
ExecutorService
ExecutorService是线程池的接口,实现类为ThreadPoolExecutor,Executors是线程池创建的工具类
步骤:
- 使用Executors工具类创建线程池,返回ThreadPoolExecutor
- 设置线程池所需参数
- 调用ThreadPoolExecutor类执行方法 --> execute、submit等
线程池的常用方法:
Executors工具类(创造线程池)的常用方法:
newCacheThreadPool() //根据需求创造线程池(依据所需线程数量,默认创建数目可以很大)
newFIxedThreadPool() //可重用的固定线程池(常用方法)
newSingleThreadExecutor() //单个线程的线程池
newScheduledThreadPool() //创建一个线程池。在给定安排的延迟后执行
ThreadPoolExecutor(线程池)的常用方法:
executoe() //传入实现Runnable接口的实例
submit() //传入实现Callable or Runnable接口的实例
/*
submit() 一般是传入实现Callable接口的实例
然后用FutureTask类接收返回值
*/
shutdown() //关闭线程池
设置线程池参数!
corePoolSize() //核心线程数目
maximumPoolSize() //最大线程数
keepAliveTime() //线程无任务时,最多保持多长时间后终止
核心线程数目和最大线程数有什么区别?
核心线程数主要是初始化固定数目的线程数,而当线程需要很多的时候,一旦等候区也满了,那么就会继续开辟新的线程,此时线程数目也不会超过最大线程数。
新开辟的非核心线程,在没有任务的固定时间后就会关闭终止!
示例:
package com.zzt;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池的使用示例
**/
class myRunnable implements Runnable{
@Override
public void run(){
System.out.println("执行完成");
}
}
public class MyThreadPool {
public static void main(String[] args) {
// 1. 使用工具类初始化线程池
ThreadPoolExecutor service = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// 2. 实例化实现Runnable接口的对象
myRunnable myRunnable = new myRunnable();
// 3. 执行线程
service.execute(myRunnable);
// 4. 关闭线程池
service.shutdown();
}
}
实际开发线程池的用法
Executors实际上都是使用ThreadPoolExecutor来创建线程!
实际开发中,我们不会直接使用Executors来创建线程!
我们可以进入Executors.newCachedThreadPool()
中newCachedThreadPool
方法可以看到:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
设置的最大线程数太大了,不适合实际开发,会出现资源耗尽的风险。
所以我们要自己使用ThreadPoolExecutor
创建线程池
详解:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程数
int maximumPoolSize, //最大线程数目
long keepAliveTime, //线程无任务,保持最长时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列,等候区大小
ThreadFactory threadFactory,//创建工程,一般不改!
RejectedExecutionHandler handler) {//拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException(); //不符合设置规则
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();//缺少阻塞队列(等候区)、工厂、拒绝策略
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
创建线程池模板
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
int cpu = Runtime.getRuntime().availableProcessors();//获取可使用的cpu核数
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, cpu, 4,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),//等待3区大小
Executors.defaultThreadFactory());
try{
for(int i=1;i<=20;i++){
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" finished");
});
}
}finally {
threadPoolExecutor.shutdown();
}
}
}
拒绝策略
默认的拒绝策略是会抛出错误的,下面介绍四种拒绝策略
new ThreadPoolExecutor.AbortPolicy() // 队列满了后抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的就回哪去!比如主线程来的,那就回主线程执行
new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和早的竞争,不会抛出异常!
思考
如何设置线程的最大数目?
- 当cpu密集型的时候,通过Runtime.getRuntime().availableProcessors()获取最大核数,来设置最大线程数
- 当IO密集型,由于IO执行很慢,所以需要不断轮换切换,故设置的线程数,最好是大型IO数的两倍左右。
ForkJoin
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!
有点类似动态规划的思想,就是把大任务分成很多小任务实现。
ForkJoin 特点:工作窃取,维护的都是双端队列
ForkJoinTask的编写,首先继承父类,实现子类的方法(这里以RecursiveTask为例)。
判断是否小于临界值,如果小于那么就直接返回值,如果大于就创建两个新任务,加入执行
package ForkjoinDemo;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.RecursiveTask;
public class forkDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
// 临界值
private Long temp = 10000L;
public forkDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else { // forkjoin 递归
long middle = (start + end) / 2; // 中间值
forkDemo task1 = new forkDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
forkDemo task2 = new forkDemo(middle+1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
//返回!!!
return task1.join() + task2.join();
}
}
}
测试模块
package ForkjoinDemo;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new forkDemo(0L, 100000000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long aLong = submit.get();
System.out.println(aLong);
}
}
异步回调
使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
重要的方法介绍
1、runAsync & supplyAsync
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方***使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync方法不支持返回值。
- supplyAsync可以支持返回值。
//无返回值
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("running");
});
future.get();//获取阻塞执行结果
}
//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return 100L;
});
Long result = future.whenComplete((t, u) -> {
System.out.println("ok--> "+t);//返回正确执行的结果
System.out.println("no--> "+u);//输出错误信息
}).exceptionally((e) -> {
e.printStackTrace();
return 200L;//错误执行时的返回结果
}).get();
System.out.println("result = "+result);
}
2、thenApply 方法
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
该方法出现错误地的时候不会执行!
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U> T:上一个任务返回结果的类型 U:当前任务的返回值类型
示例
private static void thenApply() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(()->{//supplier
long result = new Random().nextInt(100);
System.out.println("result1="+result);
return result;
}).thenApply((t)->{//function,传入一个参数返回一个结果!
long result = t*5;
System.out.println("result2="+result);
return result;
});
long result = future.get();
System.out.println(result);
}
3、handle 方法
handle 是执行任务完成时对结果的处理。 handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例
public static void handle() throws Exception{
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
int i= 10/0;
return new Random().nextInt(10);
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable==null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
});
System.out.println(future.get());
}
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
4、thenCombine 合并任务
thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
示例
private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
return "hello";
});
CompletableFuture<String> result = future1.thenCombine(future2, (t,u)->{
return t+" "+u;
});
System.out.println(result.get());
}
Volatile
学习Volatile要先了解一下JMM,理解java内存的操作
JMM
Java内存模型,不存在的东西,是一种约定!
JMM的三大特点:可见性、原子性、有序性
在JVM中运行的程序实体是线程,线程被创建的时候会被分配一个
工作内存
(有些地方为栈空间),工作内存是每个线程的私有数据域,而java内存内存模型规定所有变量都在主内存,主内存为共享内存区域,所有线程都可以访问。线程对变量的操作(读取写入)必须在工作内存中运行。
大概步骤:
- 把主内存数据拷贝到工作内存
- 对变量进行操作
- 将变量写回主内存
Java内存模型中定义了8种操作来完成,虚拟机保证了每种操作都是原子的。
- lock(锁定):作用于主存的变量,把一个变量标识为一条线程独占状态。
- unlock(解锁):作用于主存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read(读取):作用于主存变量,把一个变量的值从主存传输到工作内存。
- load(载入):作用于工作内存变量,把 read 来的值放入工作内存的变量副本中。
- use(使用):作用于工作内存变量,把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
- store(存储):作用于工作内存变量,把工作内存中一个变量的值传送到主存。
- write(写入):作用于主存变量,把 store 操作从工作内存中得到的变量的值放入主存的变量中。
在将变量从主内存读取到工作内存中,必须顺序执行read、load;要将变量从工作内存同步回主内存中,必须顺序执行store、write。并且这8种操作必须遵循以下规则:
- 1,不允许read和load、store和write操作之一单独出现。即不允许一个变量从主内存被读取了,但是工作内存不接受,或者从工作内存回写了但是主内存不接受。
- 2,不允许一个线程丢弃它最近的一个assign操作,即变量在工作内存被更改后必须同步改更改回主内存。
- 3,工作内存中的变量在没有执行过assign操作时,不允许无意义的同步回主内存。
- 4,在执行use前必须已执行load,在执行store前必须已执行assign。
- 5,一个变量在同一时刻只允许一个线程对其执行lock操作,一个线程可以对同一个变量执行多次lock,但必须执行相同次数的unlock操作才可解锁。
- 6,一个线程在lock一个变量的时候,将会清空工作内存中的此变量的值,执行引擎在use前必须重新read和load。
- 7,线程不允许unlock其他线程的lock操作。并且unlock操作必须是在本线程的lock操作之后。
- 8,在执行unlock之前,必须首先执行了store和write操作。
Volatile的作用
Volatile 是 Java 虚拟机提供轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排 ( 由于内存屏障,可以保证避免指令重排的现象产生! )
1. 可见性
package VolatileDemoPackage;
import java.util.concurrent.TimeUnit;
public class VolatileDemo {
public static void main(String[] args) {
VolatileNum num=new VolatileNum(0);
new Thread(()->{
while(num.getNum()==0){
//由于不可见性,所以会一直不结束!
//System.out.println("show:"+num.getNum());
//如果加了sout就可以见了,会自动结束!
/*因为System.out.println:
public void println(String x) {
synchronized (this) {
print(x);
newLine();
}
} */
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
num.setNum(1);
System.out.println(num.getNum());
}
}
class VolatileNum{
private volatile int num;
public VolatileNum(int num) {
this.num = num;
}
public int getNum() {
return num;
}
public synchronized void setNum(int num) {
this.num = num;
}
}
2. 不保证原子性
如果不加锁,那么输出结果会和预期有偏差,原因num++操作实际上不止一步(num++操作不具有原子性),途中被插入就会导致结果错误。
解决方式:加锁 或 采用原子变量( Atomic~)操作
package VolatileDemoPackage;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class yuanzixing {
/*
方法一:加锁实现原子性,不过效率低
*/
public static volatile int num1=0;
public static synchronized void add1(){
num1++;
}
//原子变量!
public static AtomicInteger num2=new AtomicInteger(0);
public static void add2(){
num2.getAndIncrement();
}
public static void main(String[] args) {
for(int i=0;i<10;i++){
new Thread(()->{
for(int j=0;j<2000;j++){
add1();
}
}).start();
new Thread(()->{
for(int j=0;j<2000;j++){
add2();
}
}).start();
}
while(Thread.activeCount()>2){ }//默认main、gc线程,
// 意思是等那十个线程都执行完了!再输出
System.out.println("num1 = "+num1);
System.out.println("num2 = "+num2);
}
}
那么原子变量是怎么保证原子性的呢?
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
// 参数介绍
// 第一个:当前对象
// 第二个:当前对象内存地址偏移值
}
CAS的核心类是:unsafe,java方法无法直接访问底层,需要通过本地(native)方法来实现对底层的访问,相当于一个后门,可以直接操控特定的内存数据。unsafe是通过偏移地址(valueOffset)获取数值的,故CAS是通过硬件功能,由若干个指令完成,并且能保证执行是连续的,保证了操作的原子性。
3. 禁止指令重排
指令重排是指jvm或操作系统为了提高效率,对指令的执行顺序的调整。(系统会考虑依赖程度,进行调整)
例子:
int a=1; //1
int b=1; //2
a = a+1; //3
b = a*a; //4
执行顺序为:1,2,3,4
实际可能执行顺序为:2,1,3,4
# 假设有两条线程,初始化a,b,x,y都为0
第一条:x=a,b=1
第二条:y=b,a=1
## 执行结果:
预期: x=0,y=0,a=1,b=1
可能会出现: x=1,y=1,a=1,b=1
#### 指令重排的后果就出现了
可是加了volatile后不会出现指令重排,可以正常实现预期效果。
内存屏障的作用:禁止上面和下面的指令交换顺序
4. volatile与synchronized的区别
- volatile本质是在告诉jvm当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取; synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。
- volatile仅能使用在变量级别;synchronized则可以使用在变量、方法、和类级别的
- volatile仅能实现变量的修改可见性,不能保证原子性;而synchronized则可以保证变量的修改可见性和原子性
- volatile不会造成线程的阻塞;synchronized可能会造成线程的阻塞。
- volatile标记的变量不会被编译器优化;synchronized标记的变量可以被编译器优化
单例模式
单例模式就是在程序运行中只实例化一次,创建一个全局唯一对象,有点像 Java 的静态变量,但是单例模式要优于静态变量,静态变量在程序启动的时候JVM就会进行加载,如果不使用,会造成大量的资源浪费,单例模式能够实现懒加载,能够在使用实例的时候才去创建实例。开发工具类库中的很多工具类都应用了单例模式。
单例的实现思路
- 静态化实例对象
- 私有化构造方法,禁止通过构造方法创建实例
- 提供一个公共的静态方法,用来返回唯一实例
单例的好处
- 只有一个对象,内存开支少、性能好
- 在系统设置全局访问点,优化和共享资源访问
普通的单例模式
- 饿汉式(线程安全的)
public class X{
private static X a=new X();
private X(){}
public static X getInstance(){
return a;
}
}
- 懒汉式(线程不安全,所以要加synchronized变成线程安全,采用双重校验锁)
public class X{
private static X a;
private X(){}
private static Object key = new Obejct();
public static X getInstance(){
if(a == null){//提高效率
synchronized (key) {//线程安全模式
if(a==null){
a = new X();
}
}
}
return a;
}
}
然而这些都可以通过反射创建新的实例,存在漏洞。
package SingleDemoPackage;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
import java.lang.reflect.Constructor;
public class SingleDemo {
public static void main(String[] args) throws Exception {
X a=X.getInstance();
Constructor constructor = a.getClass().getDeclaredConstructor(null);
constructor.setAccessible(true);
X b = (X) constructor.newInstance();
X c=X.getInstance();
System.out.println(a==b);//使用反射创建的对象是新对象,输出:false
System.out.println(a==c);//getInstance为同一对象,输出:true
}
}
静态内部类单例模式
静态内部类单例模式也称单例持有者模式,实例由内部类创建,由于 JVM 在加载外部类的过程中, 是不会加载静态内部类的, 只有内部类的属性/方法被调用时才会被加载, 并初始化其静态属性。静态属性由static
修饰,保证只被实例化一次,并且严格保证实例化顺序。静态内部类单例模式代码如下:
public class SingletonDemo {
private SingletonDemo(){}
// 单例持有者
private static class InstanceHolder{
private final static SingletonObject6 instance = new SingletonObject6();
}
public static SingletonDemo getInstance(){
// 调用内部类属性
return InstanceHolder.instance;
}
}
静态内部类单例模式是一种优秀的单例模式,是开源项目中比较常用的一种单例模式。在没有加任何锁的情况下,保证了多线程下的安全,并且没有任何性能影响和空间的浪费。
枚举类单例模式
枚举类介绍:
- 枚举只能拥有私有的构造器
- 枚举类不允许被反序列化,Enum重写了方法
- 静态代码块中对final变量的值进行初始化
- enum类最终是一个final class
因为枚举类型是线程安全的,并且只会装载一次,设计者充分的利用了枚举的这个特性来实现单例模式,枚举的写法非常简单,而且枚举类型是所用单例实现中唯一一种不会被破坏的单例实现模式。
所以强烈推荐使用这个方式实现单例模式!
package SingleDemoPackage;/*
* @author: G_night
* 转载请申明作者
* Reprint please state the author
*/
public enum Singleton {
INSTANCE;
private String instance;
Singleton(){ instance = new String("hello"); }
public String getInstance(){
return instance;
}
}
后记
这只是开始,并不是结束~
提供本次参考的代码
及markdown文档
:https://gitee.com/g_night/JucStudy/tree/master
里面的代码不完全和上面示例的相同,仅是个人在验证想法过程中顺带写的。