【并发编程】生产者消费者3种方式
版本1 synchronized+notify+wait
/**
* @author i
* @create 2019/12/29 14:57
* @Description 生产者消费者 1 版本
*/
class MyShareData{
private Integer count = 0;//资源
public synchronized void increment()throws Exception{
while (count != 0){
//如果count 不等于0 当前线程等待
this.wait();
}
count++;
System.out.println(Thread.currentThread().getName()+" 生产了一个"+count);
this.notify();
}
public synchronized void decrement()throws Exception{
while (count == 0){
//如果count 不等于0 当前线程等待
this.wait();
}
count--;
System.out.println(Thread.currentThread().getName()+" 消费了一个"+count);
this.notify();
}
}
public class ProducterAndCustomer01 {
public static void main(String[] args){
MyShareData myShareData = new MyShareData();
//生产者
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
myShareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
},"t1").start();
//消费者
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
myShareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
},"t2").start();
}
}
版本2 lock+Condition+awiait+singal
Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用
/**
* @author i
* @create 2019/12/29 15:04
* @Description 生产者 消费者 2版本
* Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用
*
*/
class MyShareData {
private Integer count = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//生产
public void increment() throws Exception {
try {
lock.lock();
while (count != 0) {
condition.await();
}
count++;
System.out.println(Thread.currentThread().getName() + " 生产了一个" + count);
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//消费
public void decrement() throws Exception {
try {
lock.lock();
while (count == 0) {
condition.await();
}
count--;
System.out.println(Thread.currentThread().getName() + " 消费了一个" + count);
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProducterAndCustomer02 {
public static void main(String[] args) {
MyShareData myShareData = new MyShareData();
//生产者
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
myShareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1").start();
//消费者
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
myShareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
版本3 生产者消费者 volatile/BlockingQueue/AtomicInteger
版本1中通过syn来保证原子操作,但是锁的粒度比较大。重量级。使用notify和wait 并且不确定唤醒哪一个线程有一定的随机性。
版本2中使用Lock 和 condition来进行控制
版本3 使用volatile BlockingQueue AtomicInteger 来实现组合使用
/**
* @author i
* @create 2019/12/29 16:42
* @Description 生产者消费者 volatile/BlockingQueue/AtomicInteger
*
*/
public class ProdConsumer_BlockQueueDemo {
private volatile boolean flag = true;//标志位
private BlockingQueue blockingQueue = null;
private AtomicInteger atomicInteger = new AtomicInteger();
//构造
public ProdConsumer_BlockQueueDemo(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
//生产者
public void pro() throws InterruptedException {
String str = null;
while (flag) {
str = atomicInteger.incrementAndGet()+"";//原子操作增加值
System.out.println(Thread.currentThread().getName() + "\t 生产了 "+str);
blockingQueue.offer(str,1,TimeUnit.SECONDS);//生产
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+" boss叫停了服务 生产者退出。。。");
}
//消费者
public void con()throws Exception{
String str = null;
while (flag){
str = (String) blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == str || str.equalsIgnoreCase("")){
flag = false;
System.out.println(Thread.currentThread().getName()+"消费者退出。。。");
return;
}
System.out.println(Thread.currentThread().getName()+" \t 消费了 "+str);
}
}
public void stop(){
flag = false;
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
ProdConsumer_BlockQueueDemo p = new ProdConsumer_BlockQueueDemo(blockingQueue);
new Thread(()->{
try {
p.pro();
} catch (Exception e) {
e.printStackTrace();
}
},"p").start();
new Thread(()->{
try {
p.con();
} catch (Exception e) {
e.printStackTrace();
}
},"c").start();
TimeUnit.SECONDS.sleep(5);
p.stop();
}
}