阻塞队列

1.概念

阻塞队列,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致是:线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。

当阻塞队列是空时,从队列中获取元素的操作将被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将被阻塞。

2.优势

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦满足条件,被挂起的线程又会自动被唤醒。

3.使用:BlockingQueue

为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一BlockingQueue都给你包办了。
因为之前还需要唤醒wait和阻塞sleep。

4.BlockingQueue核心方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

				抛出异常	特殊值		阻塞		超时
插入			add(e)		offer(e)		put(e)	offer(e, time, unit)
移除			remove()	poll()		take()	poll(time, unit)
检查			element()	peek()		不可用	不可用

四组不同的行为方式解释:

1(异常)

如果试图的操作无法立即执行,抛一个异常。

2(特定值)

如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

3(阻塞)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

4(超时)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

5.架构梳理+种类分析

架构介绍:BlockingQueue和list都是Collections的接口。

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
  3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。

SynchronousQueue没有容量。
与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。
每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

public static void main(String[] args){
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(()->{
            try{
                System.out.println(Thread.currentThread().getName()+"\t put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"\t put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"\t put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        },"AAA").start();

        new Thread(()->{
            try{
                try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());

                try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());

                try{ TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        },"BBB").start();
    }

------out
AAA	 put 1
BBB	1
AAA	 put 2
BBB	2
AAA	 put 3
BBB	3
  1. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
  2. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

6.使用场景

a.生产者消费者模式

b.线程池

见下一篇博客

c.消息中间件

见下一篇博客

1.传统版

/** * @author DaXia_Hao * @date 2019/8/4 0004 19:29 * * 题目:一个初始值为0的变量,两个线程对其交替操作,一个+1,一个-1,来5轮 * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒机制 **/
class ShareData{//资源类
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        lock.lock();
        try {
            //1.判断
            while (number!=0){
                //等待,不能生成
                condition.await();
            }
            //2.干活
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //3.通知唤醒
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    public void decrement() throws Exception{
        lock.lock();
        try {
            //1.判断
            while (number==0){
                //等待,不能生成
                condition.await();
            }
            //2.干活
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //3.通知唤醒
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }
}
public class ProConsumer_TraditionDemo {
    public static void main(String[] args) {

        ShareData shareData = new ShareData();
        new Thread(()->{
            for (int i = 1; i <=5; i++) {

                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"AA").start();

        new Thread(()->{
            for (int i = 1; i <=5; i++) {

                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"BB").start();
    }
}

------out
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0

2.阻塞队列版

!!!不用再控制await 和 sleep

class MyResource{
    private volatile boolean FLAG = true;//默认开启,进行生产+消费
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        while(FLAG){
            data = atomicInteger.incrementAndGet()+"";
            retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if(retValue){
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t生产停止");
    }

    public void myConsumer() throws Exception{
        String result = null;
        while(FLAG){
            result = blockingQueue.poll(2L,TimeUnit.SECONDS);
            if(null==result || result.equalsIgnoreCase("")){
                FLAG = false;
                System.out.println(Thread.currentThread().getName()+"\t 超过2秒,消费退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }
    }

    public void stop() throws Exception{
        this.FLAG = false;
    }
}

/* * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用 * */

public class ProdConsumer_BlockQueueDemo {
    public static void main(String[] args) throws Exception{
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
            System.out.println();
            System.out.println();
            try{
                myResource.myProd();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
            try{
                myResource.myConsumer();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Consumer").start();

        try{TimeUnit.SECONDS.sleep(5);}catch (InterruptedException e){e.printStackTrace();}

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("5秒钟到,main停止");
        myResource.stop();
    }
}

------out
prod	 生产线程启动
consumer	 消费线程启动
prod	 插入队列1成功
consumer	 消费队列1成功
prod	 插入队列2成功
consumer	 消费队列2成功
prod	 插入队列3成功
consumer	 消费队列3成功
prod	 插入队列4成功
consumer	 消费队列4成功
prod	 插入队列5成功
consumer	 消费队列5成功


5秒钟到,大老板main线程叫停,活动结束
prod	 大老板叫停了,Flag=false,生产结束
 超过2秒没有取到蛋糕,消费退出
全部评论

相关推荐

10-27 17:26
东北大学 Java
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务