Java多线程
Java多线程基础常见面试题
本文参考了一些技术文章,并加上自己的理解整理出来的一些Java多线程基础常见面试题解答,会持续更新。供本人复习可用,路过的小伙伴要是感兴趣也可以看看,欢迎指错。
什么是进程和线程?
进程
==进程是正在运行的程序,它单独占有内存地址空间。==系统运行一个程序就是一个进程从创建,运行到消亡的过程。
为什么会有进程的出现呢?
进程让操作系统的并发成为了可能。因为在早期的批处理操作系统中,虽然用户可以将多个需要执行的程序写在磁带上交给计算机处理,但计算机的内存始终只有一个程序在运行,也就是串行运行方式,效率是非常低的,所以就有聪明的科学人提出了进程的概念。
此时CPU采用了时间片轮转的方式运行进程。当时间片结束时,当前进程还在运行,CPU会暂停这个进程运行,并运行下一个时间片的进程。(这叫上下文切换)。当时间片还未结束,当前进程就结束或者阻塞了,CPU也会立马切换,不用等时间片用完。
比方说,当我们启动main函数时就是启动了一个JVM进程,而main函数所在的线程就是这个进程中的一个线程,也称主线程。
线程
线程是比进程还小的执行单位,它负责执行进程中的子任务。一个线程能够分割成多个线程来执行。
为什么我们还要使用多线程呢?
当随着需求的发展,我们不再想一个程序只实现一个功能,而是多个功能,这就需要进程背负多个子任务。但是这样只能逐个有序地去执行这些子任务,效率很低。比方说,一款视频软件在播放视频的时候,不能发评论,得等视频播放完才可以评论。这样是很不爽的。
因此有了线程,线程让进程内部的并发成了可能。
而且还有以下好处:
- 线程共享进程的堆和方法区资源,从而实现简单通信。而进程间存在内存隔离,数据共享和程序通信是比较复杂的。
- 除了共享资源区域外,每个线程还拥有自己的程序计数器、虚拟机栈和本地方法栈,所以进行线程切换时开销要比切换进程小。因此线程也被叫做轻量级进程。
它们的区别是什么?
本质的区别是进程是操作系统进行资源分配的基本单位,线程是操作系统进行调度的基本单位。
除此之外:
- 进程单独占有一定的内存地址空间,通信复杂同步简单;而线程共享所属进程的数据,通信简单但同步复杂。
- 进程的创建和销毁不仅要保存寄存器和栈信息,还要资源的分配回收以及页调度,开销比较大;而线程只需要保存前两者的信息,开销比较小。
什么是上下文切换?
上下文切换是指进程/线程从保存到再加载的过程。
CPU通过为每个线程分配时间片实现任务并发执行功能,在切换的过程中,会保存上一个任务的状态,以保证下次切换回来时是正确的执行位置。
创建线程的方式有哪几种?
总共有以下四种:
继承Thread类
public class MyThread extends Thread{ @Override public void run() { System.out.println("MyThread"); } }
实现Runnable接口
实现接口会比继承Thread类灵活。
//第一种方式 public class MyThread2 implements Runnable{ @Override public void run() { System.out.println("MyThread2"); } } public class Demo2 { public static void main(String[] args) { //start方法是Thread类拥有的 MyThread2 myThread2 = new MyThread2(); new Thread(myThread2).start(); //第二种方式:Java8之后 使用函数式接口 new Thread(()->{ System.out.println("MyThread2"); }).start(); } }
实现Callable接口
Runnable方法没有返回值,而Callable接口的方法有并且能抛出异常。
我们通过JDK文档找到了将Callable和Thread建立关联的类FutureTask
一般配合线程池一起使用,下面举个栗子:
public class CallableDemo { static class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { //模拟计算需要一秒 Thread.sleep(1000); return 2; } } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> result = new FutureTask<>(task); executor.submit(result); //会阻塞当前线程,直到获取结果 System.out.println(result.get()); } }
结果:
2
Runable与Callable的区别
Runnable ⾃Java 1.0以来⼀直存在,但 Callable 仅在Java 1.5中引⼊,⽬的就是为了来处理 Runnable 不⽀持的⽤例。 Runnable 接⼝不会返回结果或抛出检查异常,但是 Callable 接⼝可以。所以,如果任务不需要返回结果或抛出异常推荐使⽤ Runnable 接⼝,这样代码看起来会更加简洁。
⼯具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(
Executors.callable(Runnable task )
或Executors.callable(Runnable task,Object resule)
)。
- 通过线程池来创建
Java线程的状态以及如何转换
我们知道操作系统的状态有三个:
- ready (就绪状态)线程正在等待CPU的调度,即将进入running状态。
- running (执行状态)线程正在使用CPU。
- waiting (等待状态)线程经过等待事件或者正在等待其他资源。
Java线程有六大状态
它把ready和running状态合并成了RUNNABLE状态,把waiting状态拆分成了阻塞、等待和超时等待三个状态。
wait方法和sleep方法的区别
- sleep方法释放CPU资源, 没有释放锁;而wait方法释放CPU资源同时释放了锁。
- sleep方法是Thread,而wait方法是Object的。
- sleep方法通常用来暂停线程的运行,而wait方法通常用来通信与交互,跟notify()方法搭配。
- wait可以指定时间,也可以不指定;而sleep必须指定时间。
- wait必须放在同步块或者同步方法中,但sleep可以在任意位置。
Java线程间的通信有哪几种?
线程同步 (按一定顺序执行)需要 synchronized锁
等待、通知 wait() notify()
信号量 用 volatile来实现
- volatile可以实现内存的可见性
- int volatile signal;
管道通信 多半与I/O流有关,传输文件或者字符串
- JDK提供了PipeWriter、PipeReader、PipeInputStream、PipeOutputStream
其他方式
join() 相当于被启动的那个线程是宾客,启动它的那个线程是主人,等宾客走了,主人才会结束掉宴会打扫。
sleep()
- 与 wait的区别
ThreadLocal类
原理:JVM 中维护一个Map,这个Map的key 就是当前的线程对象,而value则是线程通过ThreadLocal.set方法保存的对象实例。当线程调用ThreadLocal.get方法时,ThreadLocal会根据当前线程对象的引用,取出Map中对应的对象返回。这样,ThreadLocal通过以各个线程对象的引用作为区分,从而将不同线程的变量隔离开来。
ThreadLocal的作用:实现线程范围内的局部变量,即ThreadLocal在一个线程中是共享的,在不同线程之间是隔离的。因为每个Thread对象都有它自己存放进ThreadLocalMap的值。
适用场景:
每个线程需要有自己单独的实例
实例需要在多个方法***享,但不希望被多线程共享
最常见的用来解决数据库连接、Session管理等。https://my.oschina.net/huangyong/blog/159725
简单地说,就是多线程情况下,如果没有ThreadLocal,连接有可能被其他线程关闭。
InheritableThreadLocal
- 不仅仅是当前线程可以存取副本值,而且它的子线程也可以存取这个副本值。
线程池
为什么要用线程池
池化技术,线程池、数据库连接池、Http连接池等等都是这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
使用线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可用不需要等到线程创建就能执行。
- 提高线程的可管理性。 使用线程池可以进行统一的分配,调优和监控。
7大参数
ThreadPoolExecutor
有四个构造函数,有7大参数:
int corPoolSize
该线程池中核心线程数最大值int maximumPoolSize
该线程池中线程总数最大值long keepAliveTime
非核心线程闲置超时时长TimeUnit unit
keepAliveTime
的单位TimeUnit是⼀个枚举类型 ,包括以下属性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 =
1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒
MINUTES : 分 HOURS : ⼩时 DAYS : 天
BlockingQueue workQueue
阻塞队列,维护着等待执行的Runnable
任务对象。ThreadFactory threadFactory
创建线程的工厂,用于批量创建线程,统一在创建线程时设置一些参数,如是否为守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。RejectedExecutionHandler handler
拒绝处理策略
原理
线程池的状态
线程池本身有⼀个调度线程,这个线程就是⽤于管理布控整个线程池⾥的各种任务 和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。
故线程池也有⾃⼰的状态。 ThreadPoolExecutor 类中定义了⼀个 volatile int 变量runState来表示线程池的状态 ,分别为RUNNING、SHURDOWN、STOP、TIDYING 、TERMINATED。
线程池创建后处于RUNNING状态。
调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,清 除⼀些空闲worker,会等待阻塞队列的任务完成。
调⽤shutdownNow()⽅法后处于STOP状态,线程池不能接受新的任务,中断 所有线程,阻塞队列中没有被执⾏的任务全部丢弃。此时,poolsize=0,阻塞队 列的size也为0。
当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
接着会执⾏terminated()函数。
ThreadPoolExecutor中有⼀个控制状态的属性叫ctl,它是⼀个AtomicInteger类型的变量。
线程池处在TIDYING状态时,执⾏完terminated()⽅法之后,就会由 TIDYING**-> TERMINATED**, 线程池被设置为TERMINATED状态。
主要任务处理流程
处理任务的核心方法是execute
,我们看看JDK1.8源码中的ThreadPoolExecutor
是如何处理线程任务的:
// JDK 1.8 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执⾏任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。 //workQueue.offer若是满了,会返回false if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执⾏拒绝策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2 线程池处于running状态,但是没有线程,则创建线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.如果放⼊workQueue失败,则创建⾮核⼼线程执⾏任务, // 如果这时创建⾮核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执⾏拒绝策略。 else if (!addWorker(command, false)) reject(command); }
大致过程:
总结一下
- 线程总数量 <
corePoolSize
,无论线程是否空闲,都会新建一个核心线程执行任务 (让核心线程数量快速达到corePoolSzie
)。注意,这一步需要获得全局锁。 - 线程总数量 >=
corePoolSize
,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行。体现了线程复用 - 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
- 缓存队列满了,且总线程数达到了
maximumPoolSize
,则会采取上面提到的拒绝策略进行处理。
如何做到线程复用
我们知道,⼀个线程在创建的时候会指定⼀个线程任务,当执⾏完这个线程任务之 后,线程⾃动销毁。但是线程池却可以复⽤线程,即⼀个线程执⾏完线程任务后不 销毁,继续执⾏另外的线程任务。那么,线程池如何做到线程复⽤呢?
原来,ThreadPoolExecutor在创建线程时,会将线程封装成⼯作线程worker,并放 ⼊⼯作线程组中,然后这个worker反复从阻塞队列中拿任务去执⾏。
我们来看源码进行分析:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //1. 若core是true,证明要创建的线程是核心线程,跟核心线程的阈值比较; //2, 若core是false,证明要创建的线程不是核心线程,跟最大阈值比较 wc >= (core ? corePoolSize : maximumPoolSize)) //超过,则返回false;然后上个方法再调用拒绝策略 return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //1. 创建一个Worker对象 w = new Worker(firstTask); //2.实例化一个Thread对象 final Thread t = w.thread; if (t != null) { //3.线程池全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //4.启动这个线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker()
上半部分是判断是否超过阈值,若超过则返回fasle;
下半部分是创建worker
对象,并初始化一个Thread
对象,然后启动它。
我们接着来看worker
类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); //线程任务就是自己 } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //当t.start时,这个run方***被jvm调用 ...其余代码省略
Worker
类继承了Runnable
接口,所以Worker
也是一个线程任务,在构造器中创建了一个线程,而这个线程的任务就是自己。所以addWorker
的下半部分的t.start()
时会触发Worker
类的runWorker(this)
被jvm调用。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //worker的任务,也就是我们传进去的任务 Runnable task = w.firstTask; w.firstTask = null; //1. 线程启动后,通过unlock去释放锁 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //2. Worker执行我们传进去的任务,或者自己去任务队列取任务 while (task != null || (task = getTask()) != null) { //会不断的去拿任务 //2.1 进行加锁操作,保证线程不被其他线程中断,除非线程池被中断 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //2.2 检查线程池状态,如果处于中断状态,当前线程被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
首先去执行创建这个worker
时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while
循环中,worker
会不断地调用getTask()
方法从阻塞队列中获取任务然后调用task.run()
执行任务,从而达到线程复用的目的。只要getTask()
不返回null
,此线程就不会退出。
最后看下getTask()
方法
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //1. allowCoreThreadTimeOut默认是false,就是核心线程即使空闲也不会被销毁 //timed为true的情况有 核心线程只能超时等待 或 目前worker数大于核心线程数 //若为true 核心线程在keepAliveTime内仍空闲则会被销毁 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //2. 如果运行线程超过了最大线程数 或 超时后设置了true //这时候任务队列已空,会去递减Worker数量,并返回null,这时候runWorker()结束,线程被回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //3. 若为true 超时 poll返回null; //若为false 核心线程会被卡在take()方法里,并被挂起,等待任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //两个方法都是拿任务 if (r != null) //返回任务 return r; //拿不到任务,设置为true 在下一轮循环递减worker数 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
核心线程的会一直卡在workQueue.take
方法里,被挂起,不会占用CPU资源,直到拿到Runnable
然后返回(当然还得看allowCoreThreadTimeOut
,如果它设置为true
,那么核心线程就会去调用poll
方法,因为poll
方法可能会返回null
,所以这时候核心线程满足超时条件也会被销毁)。
⾮核⼼线程会workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) ,如果 超时还没有拿到,下⼀次循环判断compareAndDecrementWorkerCount就会返 回 null ,Worker对象的 run() ⽅法循环体的判断为 null ,任务结束,然后线程被系统回收 。
4种拒绝策略
拒绝处理策略,线程数量⼤于最⼤线程数就会采⽤拒绝处理策略,四种拒绝处理的策略为 :
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出
RejectedExecutionException
异常。ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。
ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务。
4种常见的线程池
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CacheThreadPool 的运⾏流程如下:
提交任务进线程池。
因为corePoolSize为0的关系,不创建核⼼线程,线程池最⼤为Integer.MAX_VALUE。
尝试将任务添加到SynchronousQueue队列。
如果SynchronousQueue⼊列成功,等待被当前运⾏的线程空闲后拉取执⾏。 如果当前没有空闲线程,那么就创建⼀个⾮核⼼线程,然后从SynchronousQueue拉取任务并在当前线程执⾏。
如果SynchronousQueue已有任务在等待,⼊列操作将会阻塞。
当需要执⾏很多短时间的任务时,CacheThreadPool的线程复⽤率⽐较⾼, 会显 著的提⾼性能。⽽且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占⽤很多资源。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
核⼼线程数量和总线程数量相等,都是传⼊的参数nThreads,所以只能创建核⼼线 程,不能创建⾮核⼼线程。因为LinkedBlockingQueue的默认⼤⼩是Integer.MAX_VALUE,故如果核⼼线程空闲,则交给核⼼线程处理;如果核⼼线程 不空闲,则⼊列等待,直到核⼼线程空闲。
与CachedThreadPool的区别:
因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核 ⼼线程。 ⽽CachedThreadPool因为corePoolSize=0,所以只会创建⾮核⼼线程。
在 getTask() ⽅法,如果队列⾥没有任务可取,线程会⼀直阻塞在LinkedBlockingQueue.take() ,线程不会被回收CachedThreadPool会在60s后收回。
由于线程不会被回收,会一直卡着阻塞,所以没有任务的情况下,
newFixedThreadPool
占用资源更多。都几乎不会触发拒绝策略,但是原理不同。
newFixedThreadPool
是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;newCachedThreadPool
是因为线程池很大,几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
有且仅有⼀个核⼼线程( corePoolSize == maximumPoolSize=1),使⽤了LinkedBlockingQueue(容量很⼤),所以,不会创建⾮核⼼线程。所有任务按照 先来先执⾏的顺序执⾏。如果这个唯⼀的线程不空闲,那么新来的任务会存储在任 务队列⾥等待执⾏。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
四种常⻅的线程池基本够我们使⽤了,但是《阿⾥把把开发⼿册》不建议我们直接 使⽤Executors类中的线程池,⽽是通过 ThreadPoolExecutor 的⽅式,这样的处理 ⽅式让写的同学需要更加明确线程池的运⾏规则,规避资源耗尽的⻛险。 但如果你及团队本身对线程池⾮常熟悉,⼜确定业务规模不会⼤到资源耗尽的程度 (⽐如线程数量或任务队列⻓度可能达到Integer.MAX_VALUE)时,其实是可以使⽤JDK提供的这⼏个接⼝的,它能让我们的代码具有更强的可读性。
阻塞队列
BlockingQueue
接口提供了线程安全的队列访问方式,让我们不用担心在多线程环境下存、取共享变量的线程安全问题。
阻塞队列提供了四组不同的⽅法⽤于插⼊、移除、检查元素:
抛出异常:如果试图的操作⽆法⽴即执⾏,抛异常。当阻塞队列满时候,再往队列⾥插⼊元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列⾥获取元素时会抛出NoSuchElementException异常 。
返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true / false。
⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。
超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能 够执⾏,但等待时间不会超过给定值。返回⼀个特定值以告知该操作是否成功,通常是 true / false
BlockingQueue的实现类
ArrayBlockingQueue
由数组结构组成的有界阻塞队列。内部结构是数组。
public ArrayBlockingQueue(int capacity, boolean fair){ //..省略代码 }
可以初始化队列⼤⼩, 且⼀旦初始化不能改变。构造⽅法中的fair表示控制对象的 内部锁是否采⽤公平锁,默认是⾮公平锁。
LinkedBlockingQueue
由链表结构组成的有界阻塞队列。。内部结构是链表,具有链表的特性。默认队列的 ⼤⼩是 Integer.MAX_VALUE ,也可以指定⼤⼩。此队列按照先进先出的原则对元素进⾏排序。
DelayQueue
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue
是一个没有大小限制的无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞。
PriorityBlockingQueue
基于优先级的无界阻塞队列。
SynchronousQueue
没有任务容量,每个put
必须等待一个take
,反之亦然。
需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。
以下⽅法的返回值,可以帮助理解这个队列:
iterator() 永远返回空,因为⾥⾯没有东⻄
peek() 永远返回null
put() 往queue放进去⼀个element以后就⼀直wait直到有其他thread进来把这个element取⾛。
offer() 往queue⾥放⼀个element后⽴即返回,如果碰巧这个element被另⼀个
thread取⾛了,offer⽅法返回true,认为offer成功;否则返回false。
take() 取出并且remove掉queue⾥的element,取不到东⻄他会⼀直等。
poll() 取出并且remove掉queue⾥的element,只有到碰巧另外⼀个线程正在往
queue⾥offer数据或者put数据的时候,该⽅法才会取到东⻄。否则⽴即返回null。
isEmpty() 永远返回true
remove()&removeAll() 永远返回false
注意
PriorityBlockingQueue
不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用时要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有可用堆内存空间。对于默认大小的LinkedBlockingQueue
也是一样的。
阻塞队列的原理
/** The queued items */ //数据元素数组 final Object[] items; /** items index for next take, poll, peek or remove */ //下一个待取出元素索引 int takeIndex; /** items index for next put, offer, or add */ //下一个待添加元素索引 int putIndex; /** Number of elements in the queue */ //元素个数 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ //内部锁 final ReentrantLock lock; /** Condition for waiting takes */ //消费者监视器 private final Condition notEmpty; /** Condition for waiting puts */ //生产者监视器 private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { //..省略其他代码 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { return new ConditionObject(); }
put
操作源码
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 1.⾃旋拿锁 lock.lockInterruptibly(); try { // 2.判断队列是否满了 while (count == items.length) // 2.1如果满了,阻塞该线程,并标记为notFull线程, // 等待notFull的唤醒,唤醒之后继续执⾏while循环。 notFull.await(); // 3.如果没有满,则进⼊队列 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //入完后队列已满,索引置0 if (++putIndex == items.length) putIndex = 0; //元素个数加1 count++; // 4 唤醒⼀个等待的线程 notEmpty.signal(); }
总结put
流程:
- 所有执行
put
操作的线程竞争lock
锁,拿到lock
锁的线程进入下一步,没有拿到的自旋竞争锁。 - 判断阻塞队列是否满了,如果满了就调用
await
方法阻塞这个线程,并标记为notFull
(生产者)线程,同时释放lock
锁,等待被消费者线程唤醒。 - 如果没有满,则调用
enqueue
方法将元素put
进阻塞队列。注意这一步还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。 - 唤醒一个标记为
notEmpty
(消费者)的线程。
take
操作源码:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //若当前数据为空,线程被阻塞 while (count == 0) notEmpty.await(); //取数据 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒一个生产者线程 notFull.signal(); return x; }
总结take
操作的流程:
- 所有执行
take
操作的线程竞争lock
锁,拿到了lock
锁的线程进入下一步,没有拿到的自旋竞争锁。 - 判断阻塞队列是否为空,如果为空,则调用
await
方法阻塞这个线程,并标记为notEmpty
(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。 - 如果没有空,则调用
dequeue
方法。注意这一步还有一种情况是第二步阻塞的线程被唤醒且又拿到了lock
锁的线程。 - 唤醒一个标记为
notFull
(生产者)线程。
注意
put和tack操作都需要先获取锁,没有获取到锁的线程会被挡在第⼀道⼤⻔之外⾃旋拿锁,直到获取到锁。
就算拿到锁了之后,也不⼀定会顺利进⾏put/take操作,需要判断队列是否可⽤(是否满/空),如果不可⽤,则会被阻塞,并释放锁。
在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执⾏,否则,⾃旋拿锁,拿到锁了再while判断队列是否可⽤(这也是为什么不⽤if判断,⽽使⽤while判断的原因)。
使用场景
生产者-消费者模式
public class ProducerAndConsumerDemo { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize); public static void main(String[] args) { ProducerAndConsumerDemo demo = new ProducerAndConsumerDemo(); Producer producer = demo.new Producer(); Consumer consumer = demo.new Consumer(); producer.start(); consumer.start(); } class Producer extends Thread{ @Override public void run() { produce(); } private void produce(){ while(true){ try { queue.put(1); System.out.println("向队列插入一个元素,队列剩余空间"+queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume(){ while(true){ try { queue.take(); System.out.println("从队列取出一个元素,队列剩余"+queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
手写BlockingQueue
package bq; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyBlockingQueue { //数据元素数组 private Object[] items; //元素个数 private int count; //内部锁 private ReentrantLock lock; //下一个待添加元素索引 private int toPutIndex; //下一个待取出元素索引 private int toTakeIndex; //生产者 private Condition notFull; //消费者 private Condition notEmpty; MyBlockingQueue(int capacity, boolean fair){ this.lock = new ReentrantLock(fair); this.items = new Object[capacity]; this.notFull = lock.newCondition(); this.notEmpty = lock.newCondition(); } public void put(Object o) throws InterruptedException { if(o == null) throw new NullPointerException(); ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ while(count == items.length) notFull.await(); enqueue(o); System.out.println("放入了"+o); }finally { lock.unlock(); } } public void enqueue(Object o){ Object[] items = this.items; items[toPutIndex] = o; if(++toPutIndex == items.length) toPutIndex = 0; count++; notEmpty.signal(); } public Object take() throws InterruptedException { ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ while(count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } public Object dequeue(){ Object[] items = this.items; Object result = items[toTakeIndex]; items[toTakeIndex] = null; if(++toTakeIndex == items.length) toTakeIndex = 0; count--; notFull.signal(); return result; } public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10,true); new Thread(()->{ try { for (int i = 0; i < 10; i++) { myBlockingQueue.put(i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { for (int i = 0; i < 10; i++) { Thread.sleep(2000); System.out.println("拿出了"+myBlockingQueue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
结果:
放入了0
放入了1
拿出了0
放入了2
放入了3
拿出了1
放入了4
放入了5
放入了6
拿出了2
放入了7
放入了8
拿出了3
放入了9
拿出了4
拿出了5
拿出了6
拿出了7
拿出了8
拿出了9
CAS
当涉及到悲观锁和乐观锁时,悲观锁思想的实现有很多种我们熟悉的,比如Java中的synchronize
和Reentrenlock
,以及数据库中的行锁和表锁等。
而乐观锁有CAS和多版本机制,乐观锁总是假设对共享资源的访问不会有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。
CAS的概念
CAS是一种原子操作,从CPU层面保证了它的原子性。
当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新。其余会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
Java实现CAS的原理 —— 原子类
JUC当中原子类有四类:
基本类型
使⽤原⼦的⽅式更新基本类型
AtomicInteger :整形原⼦类
AtomicLong :⻓整型原⼦类
AtomicBoolean :布尔型原⼦类
数组类型
使⽤原⼦的⽅式更新数组⾥的某个元素
AtomicIntegerArray :整形数组原⼦类
AtomicLongArray :⻓整形数组原⼦类
AtomicReferenceArray :引⽤类型数组原⼦类
引⽤类型
AtomicReference :引⽤类型原⼦类
AtomicStampedReference :原⼦更新带有版本号的引⽤类型。该类将整数值与引⽤关联起来,可⽤于解决原⼦的更新数据和数据的版本号,可以解决使⽤ CAS 进⾏原⼦更新时可能出现的 ABA 问题。
AtomicMarkableReference :原⼦更新带有标记位的引⽤类型
对象的属性修改类型
AtomicIntegerFieldUpdater :原⼦更新整形字段的更新器
AtomicLongFieldUpdater :原⼦更新⻓整形字段的更新器
AtomicReferenceFieldUpdater :原⼦更新引⽤类型字段的更新器
AtomicInteger
类的源码分析
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用) private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; //CAS操作修改值 public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } //unsafe类的方法 public final int getAndSetInt(Object var1, long var2, int var4) { int var5; do { //通过value这个对象字段偏移量和AtomicInteger这个类对象得到的主内存的值 var5 = this.getIntVolatile(var1, var2); //跟工作内存的比较,失败则自旋 } while(!this.compareAndSwapInt(var1, var2, var5, var4)); //返回旧值 return var5; }
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。
CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
CAS造成的问题
ABA问题 可以用版本号解决,或者是使用Java的atomic包里提供的一个类AtomicStampedReference
来解决问题。
循环时间长开销大
CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。
可以用pause指令解决
只能保证一个共享变量的原子性
有两种解决方案:
- 使用JDK1.5开始就提供的
AtomicReference
类保证对象之间的原子性,把变量放到一个对象里边进行CAS操作。 - 使用锁。