Java阻塞队列

Java中的阻塞队列

1.什么是阻塞队列

阻塞队列(BlockingQueue),就是支持阻塞插入和阻塞移除的队列。
1)阻塞插入:当队列满时,会阻塞插入元素的线程,知道队列不为满时。
2) 阻塞移除:当队列为空时,获取元素的线程会阻塞,等待队列变为非空。

2.运用场景

阻塞队列可用于生产者和消费者问题,生产者就是向队列里插入元素,而消费者就是从队列里获取元素。这时,阻塞队列就可以充当生产者存元素、消费者取元素的容器。

3.消息介绍

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

抛出异常: 当队列满插入或者队列空移除时,会抛出相应的异常。
返回特殊值:插入队列返回插入是否成功,移除队列,返回相应的值,没有则返回null。
一直阻塞:也就上面的消费者和生产者的情况,队列满插入则阻塞,队列空移除也是阻塞。
超时退出:同样的道理,也是会阻塞一段时间,超时则线程退出。

4.JDK提供的阻塞队列API

jdk1.8中的阻塞队列 ,接口BlockingQueue,继承结构


ArrayBlockingQueue: 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue: 链表结构的有界阻塞队列
PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
DelayQueue: 使用优先级队列实现的无界阻塞队列
SynchronousQueue: 不存储元素的阻塞队列
LinkedTransferQueue: 链表结构的无界阻塞队列
LinkedBlockingDeque: 链表结构的双向阻塞队列

5.讨论ArrayBlockingQueue

阻塞队列很多,我们拿一个来举例;
JDK8中的部分源码
三个构造方法,可以发现,ArrayBlockingQueue必须知道大小,默认是非公平的

 /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and default access policy. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity < 1} */
    public ArrayBlockingQueue(int capacity) {
   
        this(capacity, false);
    }

    /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */
    public ArrayBlockingQueue(int capacity, boolean fair) {
   
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain * @throws IllegalArgumentException if {@code capacity} is less than * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
   
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
   
            int i = 0;
            try {
   
                for (E e : c) {
   
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
   
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
   
            lock.unlock();
        }
    }

一些属性:可以发现,使用了ReentrantLock来实现访问公平性。

 /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */
    transient Itrs itrs = null;

提问:如果队列是空的,消费者一直等待,当生产者添加元素时,消费者如何知道队列由元素?
使用通知模式实现 ,所谓通知模式,就是当生产者往满的队列添加元素时,会发生阻塞,当消费者消费了一个元素后,会通知生产者当前队列可用,JDK源码中使用了Condition来实现:

   /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    // 省略 代码
 	public ArrayBlockingQueue(int capacity, boolean fair) {
   
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    // 省略代码
      /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public void put(E e) throws InterruptedException {
   
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
   
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
   
            lock.unlock();
        }
    }
    // 省略代码
    public E take() throws InterruptedException {
   
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
   
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
   
            lock.unlock();
        }
    }

6.阻塞队列演示消费者与生产者模型

public class TestConsumerAndProducerBlockingQueue {
   

    public static void main(String[] args) {
   
        // 进行测试 创建一个销售员对象
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 创建两个消费者,两个生产者
        new Thread(new TestConsumer(queue),"消费者A").start();
        new Thread(new TestProducer(queue),"生产者B").start();
        new Thread(new TestConsumer(queue),"消费者C").start();
        new Thread(new TestProducer(queue),"生产者D").start();

    }

}

// 定义消费者
class TestConsumer implements Runnable {
   

    private ArrayBlockingQueue<Integer> product;

    public TestConsumer(ArrayBlockingQueue queue) {
   
        this.product = queue;
    }

    @Override
    public void run() {
   
        // 进行十次消费
        for (int i = 0; i < 10; i++) {
   
            try {
   
                System.out.println(Thread.currentThread().getName() + "消费了:" + product.take());
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
    }
}

// 定义生产者
class TestProducer implements Runnable {
   

    private ArrayBlockingQueue<Integer> product;

    public TestProducer(ArrayBlockingQueue queue) {
   
        this.product = queue;
    }

    @Override
    public void run() {
   
        // 进行十次生产
        for (int i = 0; i < 10; i++) {
   
            try {
   
                System.out.println(Thread.currentThread().getName() + "生成产品:" + i);
                product.put(i);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
    }
}

结果:

全部评论

相关推荐

emmm别问我为啥上一条帖子隔了两个月我才开始投简历和拿offer,因为我懒😰简单流程如下:周一凌晨改好的简历,然后到处乱投简历;周二接到了三维家的一面通知,临时抱佛脚的背了一些八股;周三上午一面下午通知第二天hr面;周四上午hr面下午拿offer,遂收手支线:在BOSS上顺手投了几个大厂,投字节的时候不小心投城客户端了,结果过了一天HR突然把我简历要走了,还问我能不能整客户端,我直接一口答应(脏面评警告😢)结果在周三下午的时候给我打电话,说前端有空缺实习岗,问我有没有兴趣,然后就跟我约了周四下午一面😰我都没咋准备啊,咩都不会啊😭结果周四下午面完,晚上打电话通知过一面了,赶紧把二面约在下周一下午,留点缓冲时间。逆大天了,我一半的问题都不会,他居然给我过了?运气未免有点好了😥现在正在恶补计网、网安、性能优化的东西(这三大板块我是几乎一点不会,一面几乎一点答不出来,加上我又没怎么背八股,这块被干烂了😵)心得体会与经验:1.&nbsp;我giao怎么这么快就结束了,我还以为要找好久😨2.&nbsp;大厂的面试问题真的和中厂小厂很大不同,比如在三维家我能自己吹水到vue的数据劫持、Proxy代理响应式之类的他们就觉得很不错了,但是在字节你但凡敢提到一下就会追问你细节了,一追问马脚就全漏出来了3.&nbsp;有信心真的很重要,我感觉我能拿中厂offer最重要的就是吹水吹出自信来了,以至于三维家面试反问面试官有哪里还需要改进的时候,他就说很不错了解的很多😦4.&nbsp;理解很重要,我从头到尾真没背过很多八股,不过有一些知识确实是敲过代码验证过,所以面试的时候能吹水吹得出来😇想了解面经啥的可以直接评论区问我,但我可能也说不全,因为我没有记录,而且今天摆了一天感觉记忆快清空了😵下面是故事时间:我暑假刚开始的时候才开始准备八股,印象很深那个时候连什么原型、事件循环、闭包这些名词都没听过,资料也不知道怎么找,就一直零零散散的准备,感觉也只有js稍微背了一下八股,其他很多时候都是靠完全理解和手写熟悉一些机制的,但这样做效率很低,反正准备了一个多星期半个月就开摆了😭结果一摆就摆到了开学,笔记是乱七八糟的,八股是忘光光的,简历是一直没改的,实习也是一直没投过的。直到上周日晚上偶然和师兄聊天,他突然问我“你怎么还不找实习”,那天晚上才幡然醒悟,是时候做点事情了😡然后就按照上面描述的来走了。其实我感觉我从头到尾都没背特别多八股,也没怎么找刷题资料啥的,早期就是翻尚硅谷或者黑马的入门视频从头学起,中期用面试鸭看了一点点题,主要是在学js机制和敲js代码,后期才发现了w3c的面经网站,然后在那里看着学(那个时候已经懒得敲了,因为有些问题与代码感觉不像是给找实习的看的,忒细了点😂)接下来继续准备字节二面吧,虽然几乎没啥可能可以通过,但是万一有奇迹呢?😍😍😍也祝大家能够早日拿到心仪的offer
我的offer呢😡:我已经预见10天后你会发,节孝子启动了
投递三维家等公司10个岗位
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务