阻塞队列中的线程协作(阻塞、唤醒、锁)

自己写一个阻塞队列

阻塞队列,主要操作有两个,一个是put放入元素,另一个是take取出元素。所谓的阻塞就是当多个线程同时存取数据时,如果遇到队列为空或者队列为满时,会发生阻塞。并且多个线程同时执行take或者put操作时,某一时刻只有一个线程获得执行权利,也就是执行任何一个操作之前需要获得锁,没有获得锁的线程发生阻塞。

put: 向队列中存入一个元素,如果已满,则阻塞当前线程,等待唤醒。如果正常存入了元素,那么唤醒其他阻塞的线程(有些执行take操作的线程因为队列为空而阻塞)

take: 从队列中取一个元素,如果队列为空,则阻塞当前线程,等待唤醒。如果正常取出了元素,那么唤醒其他阻塞的线程(有些执行put操作的线程因为队列满而阻塞)

Object类提供了几个操作来进行当前线程的唤醒和阻塞
wait: 阻塞当前线程,其实就是将当前线程放入当前对象的等待集中,释放锁(如果持有锁的话),暂停当前线程。
notify: 唤醒当前对象等待集上的一个线程。
notifyAll: 唤醒当前对象等待集上的所有线程。

基于以上,我们实现一个自己的阻塞队列:

public class MyBlockingQueue1<T> implements MyBlockingQueue<T>{
    private Object[] array;
    private int count=0;
    private int getIndex=0;
    private int putIndex=0;
    public MyBlockingQueue1(int cap){
        array = new Object[cap];
    }
    public synchronized void put(T ele) throws InterruptedException {
        while (isFull()){
            wait();
        }
        array[putIndex++]=ele;
        if (putIndex>=array.length){
            putIndex=0;
        }
        count++;
        notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (isEmpty()){
            wait();
        }
        Object element = array[getIndex++];
        if (getIndex>=array.length){
            getIndex=0;
        }
        count--;
        notifyAll();
        @SuppressWarnings("unchecked")
        T t = (T)element;
        return t;
    }
    private boolean isEmpty(){
        return count==0;
    }
    private boolean isFull(){
        return  count>=array.length;
    }
}

put和take方法都加了synchronized,也就是说这两个方法执行之前都需要先取得同一个对象锁,从而,这两个方法就不可以并行执行。于是我们可以稍微优化一下,比如put和take使用两个不同的锁,这两个操作就不会互相影响了。但也会因此使得count成为了临界资源,count++会发生竞态,我们可以考虑使用一个原子变量类来替代int类型。而且上面介绍提到的唤醒部分,每当成功put或者成功take,我们都唤醒所有线程,其实put操作成功时,我们只想唤醒那些因为队列为空而阻塞的线程,take操作成功时,我们只想唤醒那些因为队列已满而阻塞的线程,而且唤醒一个就够了。于是我们可以使用Condition来使得线程在两个不同的等待队列上进行等待,每次都唤醒特定队列上的一个线程。于是0.2版代码如下:

public class MyBlockingQueue2<T> implements MyBlockingQueue<T>{

    private Object[] array;
    private AtomicInteger count=new AtomicInteger(0);//临界资源,使用原子变量类
    private int getIndex=0;
    private int putIndex=0;

    private ReentrantLock putLock = new ReentrantLock();
    private final Condition notEmpty = putLock.newCondition();//防止过早唤醒
    private ReentrantLock takeLock = new ReentrantLock();
    private final Condition notFull = takeLock.newCondition();//防止过早唤醒

    public MyBlockingQueue2(int cap){
        array = new Object[cap];
    }
    public void put(T ele) throws InterruptedException {
        try {
            putLock.lock();
            while (isFull()){
                notFull.await();
            }
            array[putIndex++]=ele;
            if (putIndex>=array.length){
                putIndex=0;
            }
            int c = count.getAndIncrement();
            if (c==0){
                notEmpty.signal();
            }
        }finally {
            putLock.unlock();
        }

    }

    public  T take() throws InterruptedException {
        try {
            takeLock.lock();
            while (isEmpty()){
                notEmpty.wait();
            }
            Object element = array[getIndex++];
            if (getIndex>=array.length){
                getIndex=0;
            }
            int c = count.getAndDecrement();
            if (c==array.length){
                notFull.signal();
            }
            @SuppressWarnings("unchecked")
            T t = (T)element;
            return t;
        }finally {
            takeLock.unlock();
        }

    }
    private boolean isEmpty(){
        return count.get()==0;
    }
    private boolean isFull(){
        return  count.get()>=array.length;
    }
}

JDK中的阻塞队列实现

我们自己写的这个阻塞队列只是实现了最基本的put和take两个操作,而jdk中的阻塞队列提供的功能更加全面一些。首先,提供了put和take对应的非阻塞方法offer和poll,这两个方法,即使遇到队列为满或为空的情况,也不会阻塞当前线程,而是直接返回false或null。并且还提供了阻塞时间选项,比如,poll时,如果队列为空,可以选择阻塞x秒,如果x秒内还是没能拿到元素,则返回null。其次还提供了比如drainTo、contains、remove等方法来完成一次性取出所有元素,判断元素存在与否,移除一个元素等操作,作为阻塞队列的接口BlockingQueue主要有四个实现类:
ArrayBlockingQueue:这个是用数组实现的一个阻塞队列,put和take使用了同一个锁,线程等待队列使用了Condition。
LinkedBlockingQueue:这个是用链表实现的一个阻塞队列,put和take使用了 两个锁,理论上支持更大的并发量。
还有就是PriorityBlockingQueue和SynchronousQueue,一个是优先级阻塞队列,每次都按照优先级来存取元素,另一个是同步队列,其实它内部没有维护队列,而是存入一个元素之后,必须有其他线程将他取走,不然再想put的线程就会被阻塞。这两个队列内部实现跟前两个有所不同,看起来要更复杂一点,比如PriorityBlockingQueue内部是通过堆来维护优先级的,优先级比对我我们可以存入自己的比较器,而SynchronousQueue内部通过Transferer分装了一些操作,这两个队列待独立一篇细说。

完整代码请访问小火箭

全部评论

相关推荐

10-06 12:46
门头沟学院 Java
跨考小白:定时任务启动
点赞 评论 收藏
分享
拉丁是我干掉的:把上海理工大学改成北京理工大学。成功率增加200%
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务