实现任务处理线程ProcessThread和实现任务处理线程池Pool(Python)
关于
ProcessThread
- 任务处理线程需要不断地从任务队列里取任务执行;
- 任务处理线程需要有一个标记,标记线程什么时候应该停止。
Pool
- 存放多个任务处理线程;
- 负责多个线程的启停;
- 管理向线程池的提交任务,下发给线程去执行。
实现的基本功能(过程)
ProcessThread
- 基本属性(任务队列、标记)
- 线程执行的逻辑(run)
- 线程停止(stop)
Pool
- 基本属性
- 提交任务(put,batch_put)
- 线程启停(start,join)
- 线程池大小(size)
具体实现
# -*- encoding=utf-8 -*-
import psutil
import threading
from operate_system.task import Task, AsyncTask
from operate_system.queue import ThreadSafeQueue
# 任务处理线程
class ProcessThread(threading.Thread):
def __init__(self, task_queue, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
# 线程停止的标记
self.dismiss_flag = threading.Event()
# 任务队列(处理线程不断从队列取出元素处理)
self.task_queue = task_queue
self.args = args
self.kwargs = kwargs
def run(self):
while True:
# 判断线程是否被要求停止
if self.dismiss_flag.is_set():
break
task = self.task_queue.pop()
if not isinstance(task, Task):
continue
# 执行task实际逻辑(是通过函数调用引进来的)
result = task.callable(*task.args, **task.kwargs)
if isinstance(task, AsyncTask):
task.set_result(result)
def dismiss(self):
self.dismiss_flag.set()
def stop(self):
self.dismiss()
# 线程池
class ThreadPool:
def __init__(self, size=0):
if not size:
# 约定线程池的大小为CPU核数的两倍(最佳实践)
size = psutil.cpu_count() * 2
# 线程池
self.pool = ThreadSafeQueue(size)
# 任务队列
self.task_queue = ThreadSafeQueue()
for i in range(size):
self.pool.put(ProcessThread(self.task_queue))
# 启动线程池
def start(self):
for i in range(self.pool.size()):
thread = self.pool.get(i)
thread.start()
# 停止线程池
def join(self):
for i in range(self.pool.size()):
thread = self.pool.get(i)
thread.stop()
while self.pool.size():
thread = self.pool.pop()
thread.join()
# 往线程池提交任务
def put(self, item):
if not isinstance(item, Task):
raise TaskTypeErrorException()
self.task_queue.put(item)
# 批量提交
def batch_put(self, item_list):
if not isinstance(item_list, list):
item_list = list(item_list)
for item in item_list:
self.put(item)
def size(self):
return self.pool.size()
class TaskTypeErrorException(Exception):
pass