生产消费者模式
多线程知识点
- synchronized修饰在成员方法上,其实相当于
synchronized(this) { ... }
,即锁在当前对象的monitor上。
public synchronized void sync { /* process */ } // 相当于下面 public void sync { synchronized(this) { /* process */ } }
- 在synchronized方法里,或者synchronized同步代码块中,只要正在执行中,就说明currentThread已经获得当前对象的monitor了。
- 当synchronized方法或synchronized同步代码块结束,currentThread会自动释放对象的monitor。
- wait() \ notify() \ notifyAll()这三方法必须在synchronized方法或synchronized同步代码块中执行,因为这样就说明currentThread已经获得了对象的monitor。
- wait()使得当前线程释放已获得的对象monitor,并陷入一种等待。这种等待必须依靠别的获得同一个对象monitor的线程来调用notify() \ notifyAll()才会重新唤醒,但重新唤醒后需要继续执行没执行完的同步代码,而执行同步代码的前提是获得被调用成员方法的对象的monitor。所以,一个被notify() \ notifyAll()的调用而从wait()中被唤醒后的线程,是不一定会马上执行wait()的下一句代码的,因为它需要和其他竞争同一个对象monitor的线程进行竞争,如果竞争失败了,那么该线程还是只有阻塞在wait()这里,直到它竞争到对象monitor。
- notify()使得wait在同一个对象monitor上的某一个线程被唤醒。另外,synchronized代码执行完毕后,会释放对象monitor,当然,这一点跟notify()无关,因为本来就是这样。
- notifyAll()使得wait在同一个对象monitor上的所有线程被唤醒。
生产者消费者示例
public class SynProCon { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } class Data { private int number = 0; // +1 public synchronized void increment() throws InterruptedException { if (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } // -1 public synchronized void decrement() throws InterruptedException { if (number == 0) { // 等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } } /** * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 */
当使用两条线程调用Data的加一和减一方法时,结果没有出现错误。
现在使用四条线程:
public class SynProCon { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } class Data { private int number = 0; // +1 public synchronized void increment() throws InterruptedException { if (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } // -1 public synchronized void decrement() throws InterruptedException { if (number == 0) { // 等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } } /** * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-0执行加1完毕,number=2,准备通知其他线程 * Thread-2执行加1完毕,number=3,准备通知其他线程 * Thread-1执行减1完毕,number=2,准备通知其他线程 * Thread-1执行减1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-0执行加1完毕,number=2,准备通知其他线程 * Thread-3执行减1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-2执行加1完毕,number=2,准备通知其他线程 * Thread-1执行减1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-0执行加1完毕,number=2,准备通知其他线程 * Thread-3执行减1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-2执行加1完毕,number=2,准备通知其他线程 * Thread-1执行减1完毕,number=1,准备通知其他线程 * Thread-2执行加1完毕,number=2,准备通知其他线程 * Thread-0执行加1完毕,number=3,准备通知其他线程 * Thread-3执行减1完毕,number=2,准备通知其他线程 * Thread-3执行减1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 */
可以看到number的值有时候会变成3或者2,这并不是所期望的答案。
是什么原因导致的?
虚假唤醒问题
什么是虚假唤醒?
举个例子,我们现在有一个生产者、消费者、队列和三个线程。
1号线程和2号线程扮演消费者
3号线程扮演生产者
假设3号线程已经在队列添加了一个元素。
现在:
1) 1号线程从队列中获取了一个元素,此时队列变为空。
2) 2号线程也想从队列中获取一个元素,但此时队列为空,2号线程便只能进入阻塞(cond.wait()),等待队列非空。
3) 这时,3号线程将一个元素入队,并调用cond.notify()唤醒条件变量。
4) 处于等待状态的2号线程接收到3号线程的唤醒信号,便准备解除阻塞状态,执行接下来的任务(获取队列中的元素)。
5) 然而可能出现这样的情况:当2号线程准备获得队列的锁,去获取队列中的元素时,1号线程获得了队列的锁,检查到队列非空,就获取到了3号线程刚刚入队的元素,然后释放队列锁。
6) 等到2号线程获得队列锁,此时,如果是if判断,那么上面的等待代码执行完毕之后,便直接跳出if代码块,直接执行消费逻辑,者就引发了队列元素数量不正确的问题。1号线程“偷走了”这个元素,所以对于2号线程而言,这次唤醒就是“虚假”的,它需要再次等待队列非空,所以需要使用while再次判断,如果队列为空就继续等待。
修改后的代码:
public class SynProCon { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } class Data { private int number = 0; // +1 public synchronized void increment() throws InterruptedException { while (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } // -1 public synchronized void decrement() throws InterruptedException { while (number == 0) { // 等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程"); // 通知 this.notifyAll(); } } /** * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-1执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-0执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 * Thread-2执行加1完毕,number=1,准备通知其他线程 * Thread-3执行减1完毕,number=0,准备通知其他线程 */
使用Lock方式实现生产消费问题
public class LockProCon { public static void main(String[] args) { Data2 data2 = new Data2(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()-> { for (int i = 0; i < 10; i++) { try { data2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()-> { for (int i = 0; i < 10; i++) { try { data2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } class Data2 { private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // +1 public void increment() throws InterruptedException { // 加锁 lock.lock(); try { while (number != 0) { // 等待 System.out.println("生产线程" + Thread.currentThread().getName() + ",number=" + number + ",不能执行加1操作,需要等待"); condition.await(); } // System.out.println("生产线程" + Thread.currentThread().getName() + "执行加一,number=" + ++number + ",通知其他线程工作"); // 通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } // -1 public void decrement() throws InterruptedException { // 加锁 lock.lock(); try { while (number == 0) { System.out.println("消费线程" + Thread.currentThread().getName() + "number=" + number + ",不能执行减1操作,需要等待"); // 等待 condition.await(); } System.out.println("消费线程" + Thread.currentThread().getName() + "执行减一,number=" + --number + ",通知其他线程工作"); // 通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { // 解锁 lock.unlock(); } } } /** * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-0,number=1,不能执行加1操作,需要等待 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-1number=0,不能执行减1操作,需要等待 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-0执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-1执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 生产线程Thread-2,number=1,不能执行加1操作,需要等待 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 * 消费线程Thread-3number=0,不能执行减1操作,需要等待 * 生产线程Thread-2执行加一,number=1,通知其他线程工作 * 消费线程Thread-3执行减一,number=0,通知其他线程工作 */
可以看到上面的输出没有问题,但是线程的调用混乱,现在新增一个需要,要精准通知某条线程运行。
使用Condition实现精准通知唤醒线程
package com.pan.productconsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @program: juc * @description: 精准通知唤醒线程 * @author: * @create: 2023-04-01 21:39 **/ public class ConditionTest { public static void main(String[] args) { Data3 data3 = new Data3(); new Thread(()->{for (int i = 0; i < 10; i++) data3.printA();}, "A").start(); new Thread(()->{for (int i = 0; i < 10; i++) data3.printB();}, "B").start(); new Thread(()->{for (int i = 0; i < 10; i++) data3.printC();}, "C").start(); } } class Data3 { private int number = 1; private Lock lock = new ReentrantLock(); // 使用三个监视器,实现精准通知 private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void printA() { lock.lock(); try { while (number != 1) { // 监视器1进入等待状态 condition1.await(); } number = 2; System.out.println(Thread.currentThread().getName() + "->AAAAAAA"); // 通知监视器2 condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 2) { // 监视器2进入等待状态 condition2.await(); } number = 3; System.out.println(Thread.currentThread().getName() + "->BBBBBBB"); // 通知监视器3 condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (number != 3) { // 监视器3进入等待状态 condition3.await(); } number = 1; System.out.println(Thread.currentThread().getName() + "->CCCCCCC"); // 通知监视器1 condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }#JUC#