手写生产者消费者
生产者消费者--传统
解决多线程
1、线程 操作 资源类
2、判断 干活 通知
public class ProcTradition { //1、线程 操作 资源类 //2、判断 干活 通知 //实现生产者消费者 //一个线程生产,一个线程消费,生产完后才能消费 public static void main(String[] args) { ShareDate shareDate = new ShareDate(); new Thread(()->{ for (int i = 0; i < 5; i++) { shareDate.produce(); } },"线程1").start(); new Thread(()->{ for (int i = 0; i < 5; i++) { shareDate.consume(); } },"线程2").start(); new Thread(()->{ for (int i = 0; i < 5; i++) { shareDate.produce(); } },"线程3").start(); new Thread(()->{ for (int i = 0; i < 5; i++) { shareDate.consume(); } },"线程4").start(); } } class ShareDate{ // 资源类 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //生产 public void produce(){ try { lock.lock(); //需要使用while 来循环判断 使用if 可能存在虚假唤醒的时候 while (number != 0){ // 不是 0 的话就去等待,否则就去生产 //判断 condition.await(); } number++; //干活 System.out.println(Thread.currentThread().getName()+"生产"+number); condition.signalAll(); //通知 }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void consume() { try { lock.lock(); while (number == 0 ){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println(Thread.currentThread().getName()+"消费"+number); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
结果:
手写生产者消费者--新版
基于
BlockingQueue
volatile
AtomicInteger
public class ProcNew { public static void main(String[] args) throws InterruptedException { MySource mySource = new MySource(new ArrayBlockingQueue(5)); new Thread(() -> { try { mySource.producer(); } catch (InterruptedException e) { e.printStackTrace(); } }, "producer").start(); new Thread(() -> { try { mySource.consumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer").start(); TimeUnit.SECONDS.sleep(5); mySource.stop(); } } class MySource { private volatile boolean FLAG = true; private AtomicInteger number = new AtomicInteger(); // 初始值为0 private BlockingQueue blockingQueue = null; public MySource(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; System.out.println("使用\t" + blockingQueue.getClass().getName()); System.out.println("--------------------"); } public void stop(){ this.FLAG = false; } public void producer() throws InterruptedException { String data = null; boolean resu; while (FLAG) { //生产者生产 data = number.incrementAndGet() + ""; resu = blockingQueue.offer(data, 3, TimeUnit.SECONDS); if (resu) { System.out.println(Thread.currentThread().getName() + " 生产" + data); } else { System.out.println("生产失败" + data); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("FLAG为false,生产结束************"); } public void consumer() throws Exception { String data = null; while (FLAG) { data = (String) blockingQueue.poll(3L, TimeUnit.SECONDS); if (null == data || data.equalsIgnoreCase("")){ FLAG = false; System.out.println("超过3秒没取到值,消费结束"); return; } System.out.println(Thread.currentThread().getName() + " 消费" + data); } }
在写线程资源类的时候,我们没有写死使用哪种 阻塞队列,而是使用了构造注入,在用户使用的时候指定,提高了代码的可用性!
这是高手的必经之路!!!
结果:
相比以前,现在实现的生产者消费者有什么好处?
我们使用了 BlockingQueue 不用程序员去手动的通知线程进行通信了。
以前要生产5次是这么写的
new Thread(()->{ for (int i = 0; i < 5; i++) { shareDate.produce(); } },"线程1").start();现在,我们就可以直接说停就听。