用wait和notyfyAll实现:
class ProducerAndConsumer{
private volatile int capacity;
private int MAX_CAPACITY;
public ProducerAndConsumer(int capacity) {
this.MAX_CAPACITY = capacity;
this.capacity = 0;
}
public void putConsumer(String name){
Thread consumer = new Thread(new Consumer(this));//this是对象锁,需要一个对象充当锁
consumer.setName(name);
consumer.start();
}
public void putProducer(String name) {
Thread producer = new Thread(new Producer(this));
producer.setName(name);
producer.start();
}
class Consumer implements Runnable{
ProducerAndConsumer producerAndConsumer;
public Consumer(ProducerAndConsumer producerAndConsumer){
this.producerAndConsumer = producerAndConsumer;
}
public void run() {
try{
while (true) {
Thread.sleep((int)(Math.random() * 100 + 510));
synchronized (producerAndConsumer) {
while(capacity == 0) {
producerAndConsumer.wait();
}
capacity--;
System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
producerAndConsumer.notifyAll();
}
}
}catch (InterruptedException e){
System.out.println(e.getMessage());
}
}
}
class Producer implements Runnable{
ProducerAndConsumer producerAndConsumer;
public Producer(ProducerAndConsumer producerAndConsumer){
this.producerAndConsumer = producerAndConsumer;
}
public void run(){
try{//捕获中断异常
while (true){
Thread.sleep((int)(Math.random() * 100 + 500));
synchronized (producerAndConsumer) {
while(capacity == MAX_CAPACITY) {
producerAndConsumer.wait();
}
capacity++;
System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
producerAndConsumer.notifyAll();
}
}
}catch (InterruptedException e){
System.out.println(e.getMessage());
}
}
}
}
public class Test {
public static void main(String[] args) {
ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(10);
for(int i = 0; i < 10; i++) {
producerAndConsumer.putConsumer(i + "号消费者");
producerAndConsumer.putProducer(i + "号生产者");
}
}
}
使用ReentrantLock实现生产者消费者
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class ProducerAndConsumer{
private volatile int capacity;
private int MAX_CAPACITY;
private static ReentrantLock lock;
private static Condition isEmpty;
private static Condition isFull;
public ProducerAndConsumer(int capacity) {
this.MAX_CAPACITY = capacity;
this.capacity = 0;
lock = new ReentrantLock();
isEmpty = lock.newCondition();
isFull = lock.newCondition();
}
public void putConsumer(String name){
Thread consumer = new Thread(new Consumer());
consumer.setName(name);
consumer.start();
}
public void putProducer(String name) {
Thread producer = new Thread(new Producer());
producer.setName(name);
producer.start();
}
class Consumer implements Runnable{
public void run() {
while (true) {
try {
Thread.sleep((int)(Math.random() * 100 + 510));
lock.lock();
while(capacity == 0) {
isEmpty.await();
}
capacity--;
System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
isFull.signal();//reentrantlock类只能用condition进入等待队列,不能有任何的wait和notify,对应的是condition的await和signal
}catch (InterruptedException e){
}
finally {//用finall确保一定会释放锁
lock.unlock();
}
}
}
}
class Producer implements Runnable{
public void run(){
while (true){
try {
Thread.sleep((int)(Math.random() * 100 + 500));
lock.lock();
while(capacity == MAX_CAPACITY) {
isFull.await();
}
capacity++;
System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
isEmpty.signal();
}catch (InterruptedException e) {
}finally {
lock.unlock();
}
}
}
}
}
public class Test {
public static void main(String[] args) {
ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(10);
for(int i = 0; i < 10; i++) {
producerAndConsumer.putConsumer(i + "号消费者");
producerAndConsumer.putProducer(i + "号生产者");
}
}
}
实现阻塞队列,然后用阻塞队列实现生产者消费者模型:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class ArrayListBlockingQueue<T>{
private volatile int capacity;
private volatile int MAX_CAPACITY;
private T[] taskQueue;
private volatile int tail;
private volatile int head;
private static ReentrantLock lock;
private static Condition isEmpty;
private static Condition isFull;
public ArrayListBlockingQueue(int capacity){
taskQueue = (T[])(new Object[capacity + 1]);
lock = new ReentrantLock();
isEmpty = lock.newCondition();
isFull = lock.newCondition();
tail = 0;
head = 0;
this.capacity = 0;
MAX_CAPACITY = capacity;
}
public void put(T task) {
try{
lock.lock();
while(capacity == MAX_CAPACITY) {
isFull.await();
}
taskQueue[tail] = task;
tail = (tail + 1) % taskQueue.length;
capacity++;
System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
isEmpty.signal();
}catch (InterruptedException e) {
}finally {
lock.unlock();
}
}
public T get() {
try {
lock.lock();
while(capacity == 0) {
isEmpty.await();
}
capacity--;
T task = taskQueue[head];
head = (head + 1) % taskQueue.length;
System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
isFull.signal();//reentrantlock类只能用condition进入等待队列,不能有任何的wait和notify,对应的是condition的await和signal
return task;
}catch (InterruptedException e) {
}finally {
lock.unlock();
}
return null;
}
}
class Producer implements Runnable{
ArrayListBlockingQueue<String> queue;
public Producer(ArrayListBlockingQueue<String> queue){
this.queue = queue;
}
public void run(){
while (true) {
try{
Thread.sleep((int)(Math.random() * 100 + 1400));
queue.put("ThreadName:" + Thread.currentThread().getName() + ",time:" + System.nanoTime());
}catch (InterruptedException e) {
}
}
}
}
class Consumer implements Runnable{
ArrayListBlockingQueue<String> queue;
public Consumer(ArrayListBlockingQueue<String> queue){
this.queue = queue;
}
public void run(){
while (true) {
try{
Thread.sleep((int)(Math.random() * 100 + 1510));
System.out.println(Thread.currentThread().getName() + "消费了消息--> \"" + queue.get() + "\"");
}catch (InterruptedException e) {
}
}
}
}
public class Test {
public static void main(String[] args) {
ArrayListBlockingQueue<String> queue = new ArrayListBlockingQueue<>(10);
for(int i = 0; i < 10; i++) {
Thread consumer = new Thread(new Consumer(queue));
Thread producer = new Thread(new Producer(queue));
consumer.setName(i + "号消费者");
producer.setName(i + "号生产者");
consumer.start();
producer.start();
}
}
}