实现任务处理线程ProcessThread和实现任务处理线程池Pool(Python)

 关于

ProcessThread

  • 任务处理线程需要不断地从任务队列里取任务执行;
  • 任务处理线程需要有一个标记,标记线程什么时候应该停止。

Pool

  • 存放多个任务处理线程;
  • 负责多个线程的启停;
  • 管理向线程池的提交任务,下发给线程去执行。

实现的基本功能(过程)

ProcessThread

  1. 基本属性(任务队列、标记)
  2. 线程执行的逻辑(run)
  3. 线程停止(stop)

Pool

  1. 基本属性
  2. 提交任务(put,batch_put)
  3. 线程启停(start,join)
  4. 线程池大小(size)

具体实现

# -*- encoding=utf-8 -*-


import psutil
import threading

from operate_system.task import Task, AsyncTask
from operate_system.queue import ThreadSafeQueue


# 任务处理线程
class ProcessThread(threading.Thread):

    def __init__(self, task_queue, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        # 线程停止的标记
        self.dismiss_flag = threading.Event()
        # 任务队列(处理线程不断从队列取出元素处理)
        self.task_queue = task_queue
        self.args = args
        self.kwargs = kwargs

    def run(self):
        while True:
            # 判断线程是否被要求停止
            if self.dismiss_flag.is_set():
                break

            task = self.task_queue.pop()
            if not isinstance(task, Task):
                continue
            # 执行task实际逻辑(是通过函数调用引进来的)
            result = task.callable(*task.args, **task.kwargs)
            if isinstance(task, AsyncTask):
                task.set_result(result)


    def dismiss(self):
        self.dismiss_flag.set()

    def stop(self):
        self.dismiss()


# 线程池
class ThreadPool:

    def __init__(self, size=0):
        if not size:
            # 约定线程池的大小为CPU核数的两倍(最佳实践)
            size = psutil.cpu_count() * 2
        # 线程池
        self.pool = ThreadSafeQueue(size)
        # 任务队列
        self.task_queue = ThreadSafeQueue()

        for i in range(size):
            self.pool.put(ProcessThread(self.task_queue))

    # 启动线程池
    def start(self):
        for i in range(self.pool.size()):
            thread = self.pool.get(i)
            thread.start()

    # 停止线程池
    def join(self):
        for i in range(self.pool.size()):
            thread = self.pool.get(i)
            thread.stop()
        while self.pool.size():
            thread = self.pool.pop()
            thread.join()

    # 往线程池提交任务
    def put(self, item):
        if not isinstance(item, Task):
            raise TaskTypeErrorException()
        self.task_queue.put(item)

    # 批量提交
    def batch_put(self, item_list):
        if not isinstance(item_list, list):
            item_list = list(item_list)
        for item in item_list:
            self.put(item)

    def size(self):
        return self.pool.size()


class TaskTypeErrorException(Exception):
    pass

 

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务