Java阻塞队列
Java中的阻塞队列
1.什么是阻塞队列
阻塞队列(BlockingQueue),就是支持阻塞插入和阻塞移除的队列。
1)阻塞插入:当队列满时,会阻塞插入元素的线程,知道队列不为满时。
2) 阻塞移除:当队列为空时,获取元素的线程会阻塞,等待队列变为非空。
2.运用场景
阻塞队列可用于生产者和消费者问题,生产者就是向队列里插入元素,而消费者就是从队列里获取元素。这时,阻塞队列就可以充当生产者存元素、消费者取元素的容器。
3.消息介绍
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常: 当队列满插入或者队列空移除时,会抛出相应的异常。
返回特殊值:插入队列返回插入是否成功,移除队列,返回相应的值,没有则返回null。
一直阻塞:也就上面的消费者和生产者的情况,队列满插入则阻塞,队列空移除也是阻塞。
超时退出:同样的道理,也是会阻塞一段时间,超时则线程退出。
4.JDK提供的阻塞队列API
jdk1.8中的阻塞队列 ,接口BlockingQueue,继承结构
ArrayBlockingQueue: 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue: 链表结构的有界阻塞队列
PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
DelayQueue: 使用优先级队列实现的无界阻塞队列
SynchronousQueue: 不存储元素的阻塞队列
LinkedTransferQueue: 链表结构的无界阻塞队列
LinkedBlockingDeque: 链表结构的双向阻塞队列
5.讨论ArrayBlockingQueue
阻塞队列很多,我们拿一个来举例;
JDK8中的部分源码
三个构造方法,可以发现,ArrayBlockingQueue必须知道大小,默认是非公平的
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and default access policy. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity < 1} */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain * @throws IllegalArgumentException if {@code capacity} is less than * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
一些属性:可以发现,使用了ReentrantLock来实现访问公平性。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */
transient Itrs itrs = null;
提问:如果队列是空的,消费者一直等待,当生产者添加元素时,消费者如何知道队列由元素?
使用通知模式实现 ,所谓通知模式,就是当生产者往满的队列添加元素时,会发生阻塞,当消费者消费了一个元素后,会通知生产者当前队列可用,JDK源码中使用了Condition来实现:
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
// 省略 代码
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 省略代码
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// 省略代码
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
6.阻塞队列演示消费者与生产者模型
public class TestConsumerAndProducerBlockingQueue {
public static void main(String[] args) {
// 进行测试 创建一个销售员对象
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 创建两个消费者,两个生产者
new Thread(new TestConsumer(queue),"消费者A").start();
new Thread(new TestProducer(queue),"生产者B").start();
new Thread(new TestConsumer(queue),"消费者C").start();
new Thread(new TestProducer(queue),"生产者D").start();
}
}
// 定义消费者
class TestConsumer implements Runnable {
private ArrayBlockingQueue<Integer> product;
public TestConsumer(ArrayBlockingQueue queue) {
this.product = queue;
}
@Override
public void run() {
// 进行十次消费
for (int i = 0; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName() + "消费了:" + product.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 定义生产者
class TestProducer implements Runnable {
private ArrayBlockingQueue<Integer> product;
public TestProducer(ArrayBlockingQueue queue) {
this.product = queue;
}
@Override
public void run() {
// 进行十次生产
for (int i = 0; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName() + "生成产品:" + i);
product.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果: