(二)Java中的并发队列和阻塞队列
并发队列
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队
列非阻塞队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。
阻塞队列与非阻塞队
1.阻塞队列
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue, (基于数组的并发阻塞队列)
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue, (带优先级的无界阻塞队列)
9.SynchronousQueue (并发同步阻塞队列)
简单测试:
package com.bestqiang.thread.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** * @author BestQiang */
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// offer:不抛出异常,插入true,失败false
System.out.println(blockingQueue.offer("d"));
// 检查队首元素
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.remove());
}
}
true
true
true
false
a
a
a
Process finished with exit code 0
BlockingQueue最终会有四种状况,抛出异常、返回特殊值、阻塞、超时,下表总结了这些方法:
BlockingQueue是个接口,有如下实现类:
-
ArrayBlockQueue: 一个由数组支持的有界阻塞队列,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象必须明确大小,像数组一样。在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
-
LinkedBlockQueue: 一个可改变大小的阻塞队列。其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象如果没有明确大小,默认值是Integer.MAX_VALUE。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
- DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
- PriorityBlockingQueue
类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
- SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
使用BlockingQueue模拟生产者与消费者点此链接查看:
使用BlockingQueue模拟生产者与消费者
2.非阻塞队列
ConcurrentLinkedQueue :
基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。
总结
在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。
使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。
另外他们都是线程安全的,不用考虑线程同步问题。
这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作。而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况。它们俩都保证线程的安全性,即不能有一个以上的线程同时对队列进行入队或者出队操作。