生产者消费者模型

用wait和notyfyAll实现:

class ProducerAndConsumer{
    private volatile int capacity;
    private int MAX_CAPACITY;
    public ProducerAndConsumer(int capacity) {
        this.MAX_CAPACITY = capacity;
        this.capacity = 0;
    }
    public void putConsumer(String name){
        Thread consumer = new Thread(new Consumer(this));//this是对象锁,需要一个对象充当锁
        consumer.setName(name);
        consumer.start();
    }
    public void putProducer(String name) {
        Thread producer = new Thread(new Producer(this));
        producer.setName(name);
        producer.start();
    }
    class Consumer implements Runnable{
        ProducerAndConsumer producerAndConsumer;
        public Consumer(ProducerAndConsumer producerAndConsumer){
            this.producerAndConsumer = producerAndConsumer;
        }
        public void run() {
            try{
                while (true) {
                    Thread.sleep((int)(Math.random() * 100 + 510));
                    synchronized (producerAndConsumer) {
                        while(capacity == 0) {
                            producerAndConsumer.wait();
                        }
                        capacity--;
                        System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
                        producerAndConsumer.notifyAll();
                    }
                }
            }catch (InterruptedException e){
                System.out.println(e.getMessage());
            }
        }
    }
    class Producer implements Runnable{
        ProducerAndConsumer producerAndConsumer;
        public Producer(ProducerAndConsumer producerAndConsumer){
            this.producerAndConsumer = producerAndConsumer;
        }
        public void run(){
            try{//捕获中断异常
                while (true){
                    Thread.sleep((int)(Math.random() * 100 + 500));
                    synchronized (producerAndConsumer) {
                        while(capacity == MAX_CAPACITY) {
                            producerAndConsumer.wait();
                        }
                        capacity++;
                        System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
                        producerAndConsumer.notifyAll();
                    }
                }
            }catch (InterruptedException e){
                System.out.println(e.getMessage());
            }
        }
    }
}

public class Test {
    public static void main(String[] args) {
        ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(10);
        for(int i = 0; i < 10; i++) {
            producerAndConsumer.putConsumer(i + "号消费者");
            producerAndConsumer.putProducer(i + "号生产者");
        }
    }
}

使用ReentrantLock实现生产者消费者

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class ProducerAndConsumer{
    private volatile int capacity;
    private int MAX_CAPACITY;
    private static ReentrantLock lock;
    private static Condition isEmpty;
    private static Condition isFull;
    public ProducerAndConsumer(int capacity) {
        this.MAX_CAPACITY = capacity;
        this.capacity = 0;
        lock = new ReentrantLock();
        isEmpty = lock.newCondition();
        isFull = lock.newCondition();
    }
    public void putConsumer(String name){
        Thread consumer = new Thread(new Consumer());
        consumer.setName(name);
        consumer.start();
    }
    public void putProducer(String name) {
        Thread producer = new Thread(new Producer());
        producer.setName(name);
        producer.start();
    }
    class Consumer implements Runnable{
        public void run() {
            while (true) {
                try {
                    Thread.sleep((int)(Math.random() * 100 + 510));
                    lock.lock();
                    while(capacity == 0) {
                        isEmpty.await();
                    }
                    capacity--;
                    System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
                    isFull.signal();//reentrantlock类只能用condition进入等待队列,不能有任何的wait和notify,对应的是condition的await和signal
                }catch (InterruptedException e){

                }
                finally {//用finall确保一定会释放锁
                    lock.unlock();
                }
            }
        }
    }
    class Producer implements Runnable{
        public void run(){
            while (true){
                try {
                    Thread.sleep((int)(Math.random() * 100 + 500));
                    lock.lock();
                    while(capacity == MAX_CAPACITY) {
                        isFull.await();
                    }
                    capacity++;
                    System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
                    isEmpty.signal();
                }catch (InterruptedException e) {

                }finally {
                    lock.unlock();
                }
            }
        }
    }
}
public class Test {
    public static void main(String[] args) {
        ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(10);
        for(int i = 0; i < 10; i++) {
            producerAndConsumer.putConsumer(i + "号消费者");
            producerAndConsumer.putProducer(i + "号生产者");
        }
    }
}

实现阻塞队列,然后用阻塞队列实现生产者消费者模型:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class ArrayListBlockingQueue<T>{
    private volatile int capacity;
    private volatile int MAX_CAPACITY;
    private T[] taskQueue;
    private volatile int tail;
    private volatile int head;
    private static ReentrantLock lock;
    private static Condition isEmpty;
    private static Condition isFull;
    public ArrayListBlockingQueue(int capacity){
        taskQueue = (T[])(new Object[capacity + 1]);
        lock = new ReentrantLock();
        isEmpty = lock.newCondition();
        isFull = lock.newCondition();
        tail = 0;
        head = 0;
        this.capacity = 0;
        MAX_CAPACITY = capacity;
    }
    public void put(T task) {
        try{
            lock.lock();
            while(capacity == MAX_CAPACITY) {
                isFull.await();
            }
            taskQueue[tail] = task;
            tail = (tail + 1) % taskQueue.length;
            capacity++;
            System.out.println(Thread.currentThread().getName() + "生产了一个资源,目前还剩" + capacity + "个资源");
            isEmpty.signal();
        }catch (InterruptedException e) {

        }finally {
            lock.unlock();
        }
    }
    public T get() {
        try {
            lock.lock();
            while(capacity == 0) {
                isEmpty.await();
            }
            capacity--;
            T task = taskQueue[head];
            head = (head + 1) % taskQueue.length;
            System.out.println(Thread.currentThread().getName() + "消费了一个资源,目前还剩" + capacity + "个资源");
            isFull.signal();//reentrantlock类只能用condition进入等待队列,不能有任何的wait和notify,对应的是condition的await和signal
            return task;
        }catch (InterruptedException e) {

        }finally {
            lock.unlock();
        }
        return null;
    }
}

class Producer implements Runnable{
    ArrayListBlockingQueue<String> queue;
    public Producer(ArrayListBlockingQueue<String> queue){
        this.queue = queue;
    }
    public void run(){
        while (true) {
            try{
                Thread.sleep((int)(Math.random() * 100 + 1400));
                queue.put("ThreadName:" + Thread.currentThread().getName() + ",time:" +  System.nanoTime());
            }catch (InterruptedException e) {

            }
        }
    }
}
class Consumer implements Runnable{
    ArrayListBlockingQueue<String> queue;
    public Consumer(ArrayListBlockingQueue<String> queue){
        this.queue = queue;
    }
    public void run(){
        while (true) {
            try{
                Thread.sleep((int)(Math.random() * 100 + 1510));
                System.out.println(Thread.currentThread().getName() + "消费了消息--> \"" + queue.get() + "\"");
            }catch (InterruptedException e) {

            }
        }
    }
}
public class Test {
    public static void main(String[] args) {
        ArrayListBlockingQueue<String> queue = new ArrayListBlockingQueue<>(10);
        for(int i = 0; i < 10; i++) {
            Thread consumer = new Thread(new Consumer(queue));
            Thread producer = new Thread(new Producer(queue));
            consumer.setName(i + "号消费者");
            producer.setName(i + "号生产者");
            consumer.start();
            producer.start();
        }
    }
}
全部评论

相关推荐

Noob1024:一笔传三代,人走笔还在
点赞 评论 收藏
分享
评论
点赞
1
分享
牛客网
牛客企业服务