实现线程安全的队列Queue(Python)

关于队列

        用于存放多个元素,是存放各种元素的“池”。


实现的基本功能

        获取当前队列元素数量-->往队列放入元素-->往队列取出元素。

注意

        队列可能有多个线程同时操作,因此需要保证线程安全,如下两种情况:

具体实现

# -*- encoding=utf-8 -*-


import time
import threading


class ThreadSafeQueueException(Exception):
    pass


# 线程安全的队列
class ThreadSafeQueue(object):

    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.condition = threading.Condition()

    # 当前队列元素的数量
    def size(self):
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size

    # 往队列里面放入元素
    def put(self, item):
        if self.max_size != 0 and self.size() > self.max_size:
            return ThreadSafeQueueException()
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()
        pass

    def batch_put(self, item_list):
        if not isinstance(item_list, list):
            item_list = list(item_list)
        for item in item_list:
            self.put(item)

    # 从队列取出元素
    def pop(self, block=True, timeout=None):
        if self.size() == 0:
            # 需要阻塞等待
            if block:
                self.condition.acquire()
                self.condition.wait(timeout=timeout)
                self.condition.release()
            else:
                return None
        self.lock.acquire()
        item = None
        if len(self.queue) > 0:
            item = self.queue.pop()
        self.lock.release()
        return item

    def get(self, index):
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()
        return item


if __name__ == '__main__':
    queue = ThreadSafeQueue(max_size=100)

    def producer():
        while True:
            queue.put(1)
            time.sleep(3)

    def consumer():
        while True:
            item = queue.pop(block=True, timeout=-1)
            print('get item from queue: %d' % item)
            time.sleep(1)

    thread1 = threading.Thread(target=producer)
    thread2 = threading.Thread(target=consumer)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

 

全部评论

相关推荐

MScoding:你这个实习有一个是当辅导老师,这个和找技术岗没有关系吧?
点赞 评论 收藏
分享
2024-12-27 23:45
已编辑
三江学院 Java
程序员牛肉:死局。学历+无实习+项目比较简单一点。基本就代表失业了。 尤其是项目,功能点实在是太假了。而且提问点也很少。第一个项目中的使用jwt和threadlocal也可以作为亮点写出来嘛?第二个项目中的“后端使用restful风格”,“前端采用vue.JS”,“使用redis”也可以作为亮点嘛? 项目实在是太简单了,基本就是1+1=2的水平。而你目标投递的肯定也是小厂,可小厂哪里有什么培养制度,由于成本的问题,人家更希望你来能直接干活,所以你投小厂也很难投。基本就是死局,也不一定非要走后端这条路。可以再学一学后端之后走测试或者前端。 除此之外,不要相信任何付费改简历的。你这份简历没有改的必要了,先沉淀沉淀
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务