Java阻塞队列

Java中的阻塞队列

1.什么是阻塞队列

阻塞队列(BlockingQueue),就是支持阻塞插入和阻塞移除的队列。
1)阻塞插入:当队列满时,会阻塞插入元素的线程,知道队列不为满时。
2) 阻塞移除:当队列为空时,获取元素的线程会阻塞,等待队列变为非空。

2.运用场景

阻塞队列可用于生产者和消费者问题,生产者就是向队列里插入元素,而消费者就是从队列里获取元素。这时,阻塞队列就可以充当生产者存元素、消费者取元素的容器。

3.消息介绍

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

抛出异常: 当队列满插入或者队列空移除时,会抛出相应的异常。
返回特殊值:插入队列返回插入是否成功,移除队列,返回相应的值,没有则返回null。
一直阻塞:也就上面的消费者和生产者的情况,队列满插入则阻塞,队列空移除也是阻塞。
超时退出:同样的道理,也是会阻塞一段时间,超时则线程退出。

4.JDK提供的阻塞队列API

jdk1.8中的阻塞队列 ,接口BlockingQueue,继承结构


ArrayBlockingQueue: 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue: 链表结构的有界阻塞队列
PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
DelayQueue: 使用优先级队列实现的无界阻塞队列
SynchronousQueue: 不存储元素的阻塞队列
LinkedTransferQueue: 链表结构的无界阻塞队列
LinkedBlockingDeque: 链表结构的双向阻塞队列

5.讨论ArrayBlockingQueue

阻塞队列很多,我们拿一个来举例;
JDK8中的部分源码
三个构造方法,可以发现,ArrayBlockingQueue必须知道大小,默认是非公平的

 /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and default access policy. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity < 1} */
    public ArrayBlockingQueue(int capacity) {
   
        this(capacity, false);
    }

    /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */
    public ArrayBlockingQueue(int capacity, boolean fair) {
   
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain * @throws IllegalArgumentException if {@code capacity} is less than * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
   
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
   
            int i = 0;
            try {
   
                for (E e : c) {
   
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
   
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
   
            lock.unlock();
        }
    }

一些属性:可以发现,使用了ReentrantLock来实现访问公平性。

 /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */
    transient Itrs itrs = null;

提问:如果队列是空的,消费者一直等待,当生产者添加元素时,消费者如何知道队列由元素?
使用通知模式实现 ,所谓通知模式,就是当生产者往满的队列添加元素时,会发生阻塞,当消费者消费了一个元素后,会通知生产者当前队列可用,JDK源码中使用了Condition来实现:

   /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    // 省略 代码
 	public ArrayBlockingQueue(int capacity, boolean fair) {
   
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    // 省略代码
      /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public void put(E e) throws InterruptedException {
   
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
   
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
   
            lock.unlock();
        }
    }
    // 省略代码
    public E take() throws InterruptedException {
   
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
   
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
   
            lock.unlock();
        }
    }

6.阻塞队列演示消费者与生产者模型

public class TestConsumerAndProducerBlockingQueue {
   

    public static void main(String[] args) {
   
        // 进行测试 创建一个销售员对象
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 创建两个消费者,两个生产者
        new Thread(new TestConsumer(queue),"消费者A").start();
        new Thread(new TestProducer(queue),"生产者B").start();
        new Thread(new TestConsumer(queue),"消费者C").start();
        new Thread(new TestProducer(queue),"生产者D").start();

    }

}

// 定义消费者
class TestConsumer implements Runnable {
   

    private ArrayBlockingQueue<Integer> product;

    public TestConsumer(ArrayBlockingQueue queue) {
   
        this.product = queue;
    }

    @Override
    public void run() {
   
        // 进行十次消费
        for (int i = 0; i < 10; i++) {
   
            try {
   
                System.out.println(Thread.currentThread().getName() + "消费了:" + product.take());
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
    }
}

// 定义生产者
class TestProducer implements Runnable {
   

    private ArrayBlockingQueue<Integer> product;

    public TestProducer(ArrayBlockingQueue queue) {
   
        this.product = queue;
    }

    @Override
    public void run() {
   
        // 进行十次生产
        for (int i = 0; i < 10; i++) {
   
            try {
   
                System.out.println(Thread.currentThread().getName() + "生成产品:" + i);
                product.put(i);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
    }
}

结果:

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
正在热议
更多
# 春招至今,你的战绩如何? #
9578次浏览 87人参与
# 你的实习产出是真实的还是包装的? #
1710次浏览 40人参与
# 巨人网络春招 #
11301次浏览 223人参与
# 军工所铁饭碗 vs 互联网高薪资,你会选谁 #
7449次浏览 43人参与
# 简历第一个项目做什么 #
31547次浏览 330人参与
# 重来一次,我还会选择这个专业吗 #
433345次浏览 3926人参与
# MiniMax求职进展汇总 #
23826次浏览 308人参与
# 当下环境,你会继续卷互联网,还是看其他行业机会 #
186964次浏览 1120人参与
# 牛客AI文生图 #
21408次浏览 238人参与
# 不考虑薪资和职业,你最想做什么工作呢? #
152287次浏览 887人参与
# 研究所笔面经互助 #
118866次浏览 577人参与
# 简历中的项目经历要怎么写? #
310048次浏览 4192人参与
# AI时代,哪些岗位最容易被淘汰 #
63387次浏览 803人参与
# 面试紧张时你会有什么表现? #
30485次浏览 188人参与
# 你今年的平均薪资是多少? #
213006次浏览 1039人参与
# 你怎么看待AI面试 #
179843次浏览 1234人参与
# 高学历就一定能找到好工作吗? #
64307次浏览 620人参与
# 你最满意的offer薪资是哪家公司? #
76432次浏览 374人参与
# 我的求职精神状态 #
447981次浏览 3128人参与
# 正在春招的你,也参与了去年秋招吗? #
363237次浏览 2637人参与
# 腾讯音乐求职进展汇总 #
160581次浏览 1111人参与
# 校招笔试 #
470371次浏览 2963人参与
牛客网
牛客网在线编程
牛客网题解
牛客企业服务