关于多线程中的虚假唤醒的原因和解决,以及分别用Lock锁和阻塞队列实现消费者生产者案例
虚假唤醒: 两个线程以上会造成虚假唤醒的情况。虚假唤醒(spurious wakeup)是一个表象,即在多处理器的系统下发出wait的程序有可能在没有notify唤醒的情形下苏醒继续执行。以运行在linux的hotspot虚拟机上的java程序为例,wait方法在jvm执行时实质是调用了底层pthread_cond_wait/pthread_cond_timedwait函数,挂起等待条件变量来达到线程间同步通信的效果,而底层wait函数在设计之初为了不减慢条件变量操作的效率并没有去保证每次唤醒都是由notify触发,而是把这个任务交由上层应用去实现,即使用者需要定义一个循环去判断是否条件真能满足程序继续运行的需求,当然这样的实现也可以避免因为设计缺陷导致程序异常唤醒的问题。
解决: 所以为了避免这种情况,只好用while循环避免虚假唤醒。(因为if只判断一次,不能避免虚假唤醒)
下面是使用Lock锁完成消费者和生产者案例,里面使用了while循环避免了虚假唤醒。
思路是:定义一个资源类,里面先判断,再写代码,在通知,主线程通过子线程进行调用
package com.bestqiang.thread.Queue;
/** * @author BestQiang */
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,来5轮 * 1. 线程 操作 资源类 * 2. 判断 干活 通知 * 3. 防止虚假唤醒机制 */
public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void decrement() throws Exception {
lock.lock();
try {
// 1.判断 使用while循环,避免虚假唤醒
while (number == 0) {
// 等待,不能生产
condition.await();
}
// 2.干活
number --;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3.通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void increment() throws Exception {
lock.lock();
try {
// 1.判断
while (number != 0) {
// 等待,不能生产
condition.await();
}
// 2.干活
number ++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3.通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
上面的那个例子,为什么number不加volatile,来保证它的可见性呢?原因是锁的Happens-Before原则,不知道Happens-Before的,请看 Happens-Before
接下来,使用阻塞队列和AtomicInteger完成生产者和消费者案例,此案例因为没有加锁,处于线程安全的考虑,变量加上volatile,采用CAS的原子类进行操作实现。
package com.bestqiang.thread.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource {
/** * 默认开启,进行生产+消费 */
private volatile boolean flag = true; //保证可见性
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,表示FLAG=false,生产动作结束");
}
public void myConsumer() throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有取到蛋糕,消费失败");
} else {
System.out.println(Thread.currentThread().getName() + "\t 消费队列蛋糕" + result + "成功");
}
}
}
public void stop() {
this.flag = false;
}
}
/** * @author BestQiang * volatile/CAS/atomicInteger/BlockQueue/线程交互 */
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println("\t 生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println("\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
TimeUnit.SECONDS.sleep(5);
myResource.stop();
System.out.println("5秒钟时间到,大老板main线程叫停,消费退出");
}
}