阻塞队列
1.概念
阻塞队列,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致是:线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。
当阻塞队列是空时,从队列中获取元素的操作将被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将被阻塞。
2.优势
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦满足条件,被挂起的线程又会自动被唤醒。
3.使用:BlockingQueue
为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一BlockingQueue都给你包办了。
因为之前还需要唤醒wait和阻塞sleep。
4.BlockingQueue核心方法
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
四组不同的行为方式解释:
1(异常)
如果试图的操作无法立即执行,抛一个异常。
2(特定值)
如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
3(阻塞)
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
4(超时)
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
5.架构梳理+种类分析
架构介绍:BlockingQueue和list都是Collections的接口。
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
SynchronousQueue没有容量。
与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。
每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
public static void main(String[] args){
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try{
System.out.println(Thread.currentThread().getName()+"\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e){
e.printStackTrace();
}
},"AAA").start();
new Thread(()->{
try{
try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
} catch (InterruptedException e){
e.printStackTrace();
}
},"BBB").start();
}
------out
AAA put 1
BBB 1
AAA put 2
BBB 2
AAA put 3
BBB 3
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
6.使用场景
a.生产者消费者模式
b.线程池
见下一篇博客
c.消息中间件
见下一篇博客
1.传统版
/** * @author DaXia_Hao * @date 2019/8/4 0004 19:29 * * 题目:一个初始值为0的变量,两个线程对其交替操作,一个+1,一个-1,来5轮 * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒机制 **/
class ShareData{//资源类
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
lock.lock();
try {
//1.判断
while (number!=0){
//等待,不能生成
condition.await();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement() throws Exception{
lock.lock();
try {
//1.判断
while (number==0){
//等待,不能生成
condition.await();
}
//2.干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
public class ProConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i = 1; i <=5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
},"BB").start();
}
}
------out
AA 1
BB 0
AA 1
BB 0
AA 1
BB 0
AA 1
BB 0
AA 1
BB 0
2.阻塞队列版
!!!不用再控制await 和 sleep
class MyResource{
private volatile boolean FLAG = true;//默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception{
String data = null;
boolean retValue;
while(FLAG){
data = atomicInteger.incrementAndGet()+"";
retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t生产停止");
}
public void myConsumer() throws Exception{
String result = null;
while(FLAG){
result = blockingQueue.poll(2L,TimeUnit.SECONDS);
if(null==result || result.equalsIgnoreCase("")){
FLAG = false;
System.out.println(Thread.currentThread().getName()+"\t 超过2秒,消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
}
}
public void stop() throws Exception{
this.FLAG = false;
}
}
/* * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用 * */
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) throws Exception{
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
System.out.println();
System.out.println();
try{
myResource.myProd();
}catch (Exception e){
e.printStackTrace();
}
},"Prod").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
try{
myResource.myConsumer();
}catch (Exception e){
e.printStackTrace();
}
},"Consumer").start();
try{TimeUnit.SECONDS.sleep(5);}catch (InterruptedException e){e.printStackTrace();}
System.out.println();
System.out.println();
System.out.println();
System.out.println("5秒钟到,main停止");
myResource.stop();
}
}
------out
prod 生产线程启动
consumer 消费线程启动
prod 插入队列1成功
consumer 消费队列1成功
prod 插入队列2成功
consumer 消费队列2成功
prod 插入队列3成功
consumer 消费队列3成功
prod 插入队列4成功
consumer 消费队列4成功
prod 插入队列5成功
consumer 消费队列5成功
5秒钟到,大老板main线程叫停,活动结束
prod 大老板叫停了,Flag=false,生产结束
超过2秒没有取到蛋糕,消费退出