【并发编程】生产者消费者3种方式

版本1 synchronized+notify+wait

/**
 * @author i
 * @create 2019/12/29 14:57
 * @Description 生产者消费者 1 版本
 */
class MyShareData{

    private Integer count = 0;//资源

    public synchronized void increment()throws  Exception{
        while (count != 0){
            //如果count 不等于0 当前线程等待
            this.wait();
        }
        count++;
        System.out.println(Thread.currentThread().getName()+" 生产了一个"+count);
        this.notify();
    }

    public synchronized void decrement()throws  Exception{
        while (count == 0){
            //如果count 不等于0 当前线程等待
            this.wait();
        }
        count--;
        System.out.println(Thread.currentThread().getName()+" 消费了一个"+count);
        this.notify();
    }

}

public class ProducterAndCustomer01 {

    public static void main(String[] args){
        MyShareData myShareData = new MyShareData();
        //生产者
        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    myShareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"t1").start();
        //消费者
        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    myShareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"t2").start();
    }

}

版本2 lock+Condition+awiait+singal

Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用

/**
 * @author i
 * @create 2019/12/29 15:04
 * @Description 生产者 消费者 2版本
 *  Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用
 *
 */
class MyShareData {

    private Integer count = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //生产
    public void increment() throws Exception {
        try {

            lock.lock();
            while (count != 0) {
                condition.await();
            }
            count++;
            System.out.println(Thread.currentThread().getName() + " 生产了一个" + count);
            condition.signal();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    //消费
    public void decrement() throws Exception {
        try {
            lock.lock();
            while (count == 0) {
                condition.await();
            }
            count--;
            System.out.println(Thread.currentThread().getName() + " 消费了一个" + count);
            condition.signal();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

public class ProducterAndCustomer02 {

    public static void main(String[] args) {
        MyShareData myShareData = new MyShareData();
        //生产者
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    myShareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();
        //消费者
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    myShareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t2").start();
    }

}

版本3 生产者消费者 volatile/BlockingQueue/AtomicInteger

版本1中通过syn来保证原子操作,但是锁的粒度比较大。重量级。使用notify和wait 并且不确定唤醒哪一个线程有一定的随机性。

版本2中使用Lock 和 condition来进行控制

版本3 使用volatile BlockingQueue AtomicInteger 来实现组合使用

/**
 * @author i
 * @create 2019/12/29 16:42
 * @Description 生产者消费者 volatile/BlockingQueue/AtomicInteger
 *  
 */
public class ProdConsumer_BlockQueueDemo {

    private volatile boolean flag = true;//标志位
    private BlockingQueue blockingQueue = null;
    private AtomicInteger atomicInteger = new AtomicInteger();

    //构造
    public ProdConsumer_BlockQueueDemo(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    //生产者
    public void pro() throws InterruptedException {
        String str = null;
        while (flag) {
            str = atomicInteger.incrementAndGet()+"";//原子操作增加值
            System.out.println(Thread.currentThread().getName() + "\t 生产了 "+str);
            blockingQueue.offer(str,1,TimeUnit.SECONDS);//生产
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+" boss叫停了服务 生产者退出。。。");
    }

    //消费者
    public void con()throws  Exception{
        String str = null;
        while (flag){
            str = (String) blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (null == str || str.equalsIgnoreCase("")){
                flag = false;
                System.out.println(Thread.currentThread().getName()+"消费者退出。。。");
                return;
            }
            System.out.println(Thread.currentThread().getName()+" \t 消费了 "+str);
        }
    }

    public void stop(){
        flag = false;
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
        ProdConsumer_BlockQueueDemo p = new ProdConsumer_BlockQueueDemo(blockingQueue);

        new Thread(()->{
            try {
                p.pro();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"p").start();

        new Thread(()->{
            try {
                p.con();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"c").start();

        TimeUnit.SECONDS.sleep(5);
        p.stop();
    }

}

 

 

 

 

 

 

 

 

 

全部评论

相关推荐

11-05 07:29
贵州大学 Java
点赞 评论 收藏
分享
安静的垂耳兔在泡澡:ks已经第八次投递了,它起码挂了还让你再投,不错了
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务