Python系统编程-多进程和多线程

1. 进程和线程的简单解释

参考here

  1. 计算机的核心是CPU,它承担了所有的计算任务,就像一座工厂,时刻在运行。
  2. 假定工厂的电力有限,一次只能供给一个车间使用,也就是说一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。
  3. 进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。
  4. 一个车间里面,可以有很多工人,他们协同完成一个任务。
  5. 线程就好比车间里面的工人,一个进程可以包括多个线程。
  6. 车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的,这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存。
  7. 可是,每个房间的大小不同,有的房间最多容纳一个人,比如厕所。里面有人的时候,其他人就进不去了。这代表一个进程使用某些共享内容的时候,其他线程必须等它结束,才能使用这一块内存。
  8. 一个防止他人进入的简单方法,就是在门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫“互斥锁”,简写为Mutex,防止多个线程同时读写某一块内存区域。。
  9. 还有一些房间,可以同时容纳n个人,比如厨房,也就是说,如果人数大于n,多出来的人就只能在外面等着。这就好比某些内存区域,只能供给固定数目的线程使用。
  10. 这时的解决办法,就是在门口挂n把锁。进去的人就取一把钥匙,出来时再把要是挂回到远处。后面的人看到没钥匙的时候,就知道必须在门口排队等着了。这种做法叫做“信号量”,用来保证多个线程不会互相冲突。
    可以看出,互斥锁是信号量的一种特殊情况,也就是说,完全可以使用信号量代替互斥锁。但是互斥锁比较简单,并且效率高,所以必须在保证资源独占的情况下,采用这种设计。
    总结:
    操作系统的设计,可以归纳为3点:
    1)以多进程形式,允许多个任务同时进行;
    2)以多线程形式,允许单个任务分成不同的部分运行;
    3)提供协调机制,一方面防止进程之间和线程之间产生冲突,另一方面允许进程之间和线程之间共享资源。

2. 进程

2.1. 多进程-multiprocess

python的多进程编程主要依靠multiprocess模块,首先对比两段代码,第一段代码使用单进程计算8的20次方,并sleep 2秒,重复2次,输出总的耗时。

import time
import os

def task():
    print("当前进程 %s" % os.getpid())
    time.sleep(2)
    print("结果为 ", 8**20)

if __name__ == '__main__':
    print("当前母进程为 %s" % os.getpid())
    start = time.time()
    for i in range(2):
        task()
    end = time.time()
    time = end - start

    print("用时 %s" % time)
当前母进程为 16052
当前进程 16052
结果为  1152921504606846976
当前进程 16052
结果为  1152921504606846976
用时 4.001027584075928

从输出结果可以看出,总共耗时4秒多,使用只存在一个进程。

使用多进程实现上面的功能,使用multiprocess模块的process方法,该方法可以创建两个新的进程来进行并行计算。Process方法接受两个参数,第一个是target,指向函数名,第二个是args,需要向函数传递的参数。对于创建的新进程,调用start()方法就可以开始进程,可以使用getpid打印当前进程的名字。

p = Process(target=XXX,args=(tuple,),kwargs={key:value})
target = XXX 指定的任务函数,不用加(),
args=(tuple,)kwargs={key:value}给任务函数传递的参数
from multiprocessing import Process
import time
import os


def task(i, m=None):
    print("子进程 %s, 任务%s, m值为%s" % (os.getpid(), i, m))
    time.sleep(2)
    print("结果为 ", 8 ** 20)


if __name__ == '__main__':
    print("当前母进程为 %s" % os.getpid())
    start = time.time()
    p1 = Process(target=task, args=(1,), kwargs={'m': 2})
    p2 = Process(target=task, args=(2,), kwargs={'m': 3})
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    time = end - start
    print("用时 %s" % time)
当前母进程为 16712
子进程 19848, 任务1, m值为2
子进程 9412, 任务2, m值为3
结果为  1152921504606846976
结果为  1152921504606846976
用时 2.1138699054718018

需要在pycharm中运行程序,在notebook jupyter中运行程序得不到上面的结果。 通过结果可以发现,耗时变为了2秒多,可见并发执行的时间明显比顺序执行的快很多。使用join方法是为了让母进程阻塞,等待子进程都完成之后才能继续执行后面的代码。

join 该方法可以等待子进程结束后再继续往下运行,进一步解释为,哪个进程调用了join方法,主进程就要等待子进程执行完后才能继续向下执行。

不使用p1.join(), p2.join()的结果,此时输出的知识母进程的耗时。

当前母进程为 10056
用时 0.013927936553955078
子进程 6976, 任务1
子进程 11592, 任务2
结果为  1152921504606846976
结果为  1152921504606846976

2.1.1. Process方法:

  1. is_alive():返回进程是否在运行,bool类型。

  2. join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  3. start():进程准备就绪,等待CPU调度

  4. run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  5. terminate():不管任务是否完成,立即停止工作进程

  6. exitcode:如果进程尚未终止,返回None。
    如果exitcode的值为:

     == 0: 没有错误正常退出  
     > 0: 进程有错误,并以此状态码退出  
     < 0: 进程被 -1 * 的信号杀死并以此作为 ExitCode 退出  

如下所示,terminate只是停止一个进程,并没有完全杀死进程,需要在terminate之后使用join来等待进程结束。

# 杀死一个进程
import multiprocessing
import time

def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Process before execution:', p, p.is_alive())
    p.start()
    print('Process running:', p, p.is_alive())
    p.terminate()
    print('Process terminated:', p, p.is_alive())
    print('Process exit code:', p.exitcode)
    p.join()
    print('Process joined:', p, p.is_alive())
    print('Process exit code:', p.exitcode)
Process before execution: <Process(Process-1, initial)> False
Process running: <Process(Process-1, started)> True
Process terminated: <Process(Process-1, started)> True
Process exit code: None
Process joined: <Process(Process-1, stopped[SIGTERM])> False
Process exit code: -15

2.2. 进程之间通信 Queue, Pipe

主进程与子进程是并发执行的,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(range(5)))。可以参考这里here

进程通信(进程之间传递数据)用进程队列(multiprocessing.Queue(),单向通信),管道( multiprocessing.Pipe() ,双向通信)。

2.2.1. Queue

参考here1
方法:

  1. put(obj[, block, timeout])
    将 obj 放入队列,其中当 block 参数设为 True 时,一旦队列被写满,则代码就会被阻塞,直到有进程取走数据并腾出空间供 obj 使用。timeout 参数用来设置阻塞的时间,即程序最多在阻塞 timeout 秒之后,如果还是没有空闲空间,则程序会抛出 queue.Full 异常。
  2. get(block[, timeout])
    获取队列中的一条消息,然后将其从队列中移除。当 block 为 True 且 timeout 为 None 时,该方***阻塞当前进程,直到队列中有可用的数据。如果 block 设为 False,则进程会直接做取数据的操作,如果取数据失败,则抛出 queue.Empty 异常(这种情形下 timeout 参数将不起作用)。如果手动 timeout 秒数,则当前进程最多被阻塞 timeout 秒,如果到时依旧没有可用的数据取出,则会抛出 queue.Empty 异常。
# 在主进程中创建两个子进程,一个用来向queue中写数据,
# 另一个从queue中读数据。
from multiprocessing import Queue, Process
import os
import random
import time


def write(q):
    print("process to write", os.getpid())
    for i in ['A','B','C']:
        print('Put %s to queue...' % i)
        q.put(i)
        time.sleep(random.random())


def read(q):
    print("process to read", os.getpid())
    while True:
        if not q.empty():
            value = q.get()
            print('Get %s from queue.' % value)
        else:
            break


if __name__ == '__main__':
    # 父进程创建Queue,然后传递给各个子进程
    q = Queue()
    p1 = Process(target=write, args=(q,))
    p2 = Process(target=read, args=(q,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
process to write 14132
Put A to queue...
Put B to queue...
Put C to queue...
process to read 5708
Get A from queue.
Get B from queue.
Get C from queue.

上述代码中,首先执行写数据,然后等待写数据进程的结束,然后开始读数据,在读数据中判断队列是否为空来退出程序,然后终止读数据进程。

2.2.2. Pipe

参考here
Pipe 直译过来的意思是“管”或“管道”,该种实现多进程编程的方式,和实际生活中的管(管道)是非常类似的。通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。

使用 Pipe 实现进程通信,首先需要调用 multiprocessing.Pipe() 函数来创建一个管道。该函数的语法格式如下:

conn1, conn2 = multiprocessing.Pipe( [duplex=True] )

其中,conn1 和 conn2 分别用来接收 Pipe 函数返回的 2 个端口;duplex 参数默认为 True,表示该管道是双向的,即位于 2 个端口的进程既可以发送数据,也可以接受数据,而如果将 duplex 值设为 False,则表示管道是单向的,conn1 只能用来接收数据,而 conn2 只能用来发送数据。

  1. send(obj) 发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可序列化的,如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常。
  2. recv() 接收另一端通过 send() 方法发送过来的数据。
# 实现,conn1发送数据,conn2接受数据。
from multiprocessing import Process, Pipe
import multiprocessing
import os


def processFun(conn,name):
    print(os.getpid(), "进程发送数据:", name)
    conn.send(name)


if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=processFun, args=(conn1, 'hello world!!!'))
    p.start()
    p.join()

    print(os.getpid(), '接受数据', conn2.recv())

2.3. 进程池Pool

Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程来执行该请求;但如果池中的进程数已经达到规定的最大值,那么请求就会等待,直到池中有进程结束,才会创建新的进程来添加到进程池中。

方法

  • close():防止任何更多的任务被提交到池中。 一旦完成所有任务,工作进程将退出。
  • terminate():立即停止工作进程而不完成未完成的工作。当池对象被垃圾收集时,terminate()将立即调用。
  • join():等待工作进程退出。必须打电话close()或 terminate()使用之前join()。

2.3.1. 进程池简单使用(非租塞)

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(2)
    print("end:", msg)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = 'hello %s' % str(i)
        pool.apply_async(func, (msg,))

    print('mark....')
    pool.close()
    pool.join()
    print(".....")
mark....
msg: hello 0
msg: hello 1
msg: hello 2
end: hello 0
msg: hello 3
end: hello 1
end: hello 2
end: hello 3
.....

说明:首先创建一个进程数量为3的进程池,然后往进程池中添加4个对象,其中3个进程会直接执行,第4个对象会等待,等待进程池中有进程结束的时候,才创建新的进程执行它。从上面的执行结果可以看出,当end hello 0之后,就创建了新的进程msg hello 3. 因为是非阻塞的(apply_async),所以主程序会自己执行,所以运行完for循环之后,直接输出mark。pool.close()防止更多的任务被提交到进程池中。一旦完成所有任务,工作进程将推出。pool.join()会等待工作的进程退出。如果去掉pool.join(),此时在pycharm中只会打印主程序的结果

mark....
.....

此时没有打印进程的内容,因为进程还未结束的时候就退出了程序。

当在pool.join()前面使用pool.terminal()方法时,此时会终止正在执行的进程,然后接着执行主程序,此时的打印结果为:

mark....
.....

2.3.2. 进程池(阻塞)

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(2)
    print("end:", msg)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = 'hello %s' % str(i)
        pool.apply(func, (msg,))

    print('mark....')
    pool.close()
    pool.join()
    print(".....")
msg: hello 0
end: hello 0
msg: hello 1
end: hello 1
msg: hello 2
end: hello 2
msg: hello 3
end: hello 3
mark....
.....

2.3.3. 阻塞和非阻塞模式区别

参考here
从上面的结果可以比较清楚的看出:

  1. apply方法是阻塞的,意思就是等待当前子进程执行完毕后,在执行下一个进程。首先执行完4个子进程,然后执行主程序部分。(这样是不是意味着使用apply的进程池的进程个数只用申请1个就可以了?)
  2. apply_async方法是非阻塞的,也就是不用等待当前进程执行完毕,根据系统调度来进行进程切换。如上面的例子,当不使用pool.join()来等待子进程结束的时候,从输出可以看出只执行了主程序的内容。这是因为,主程序太短,CPU运行很快,完全没有给操作系统进程切换的机会,主程序就运行完毕了,然后整个程序结束。子进程完全没有切换到程序就已经结束了。

2.3.4. 关注进程执行的结果

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(2)
    print("end:", msg)
    return "done %s" % msg


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    result = []
    for i in range(4):
        msg = 'hello %s' % str(i)
        res = pool.apply_async(func, (msg,))
        result.append(res)

    print('mark....')
    pool.close()
    pool.join()
    for item in result:
        print("***", item.get(),"****")
    print(".....")
mark....
msg: hello 0
msg: hello 1
msg: hello 2
end: hello 0
msg: hello 3
end: hello 1
end: hello 2
end: hello 3
*** done hello 0 ****
*** done hello 1 ****
*** done hello 2 ****
*** done hello 3 ****
.....

说明:将所有进程执行结果保存在result中,然后在主程序中输出。

2.3.5. 进程池中的Queue

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到如下的错误信息:

RuntimeError: Queue objects should only be shared between processs through inheritance

从上面的Queue代码中可以看出,创建了两个进程,一个用来读,一个用来写数据,当使用进程池的时候就不用这么麻烦了,如下所示:

from multiprocessing import Manager, Pool
import os
import random
import time


def write(q):
    print("process to write", os.getpid())
    for i in ['A','B','C']:
        print('Put %s to queue...' % i)
        q.put(i)
        time.sleep(random.random())


def read(q):
    print("process to read", os.getpid())
    while True:
        if not q.empty():
            value = q.get()
            print('Get %s from queue.' % value)
        else:
            break


if __name__ == '__main__':
    # 父进程创建Queue,然后传递给各个子进程
    q = Manager().Queue()
    po = Pool()
    po.apply(write, (q,))
    po.apply(read, (q,))
    po.close()
    po.join()
    print("......")
process to write 17884
Put A to queue...
Put B to queue...
Put C to queue...
process to read 10680
Get A from queue.
Get B from queue.
Get C from queue.
......

2.4. 僵尸进程和孤儿进程

  • 孤儿进程:父进程退出后,子进程还在运行的这些进程就是孤儿进程。比如上面的2.1中的例子,没有调用join方法的时候,主进程先执行完毕,然后退出,此时的子进程还在运行。
  • 僵尸进程:被使用terminate方法强制终止的进程为僵尸进程。只能等待主程序结束,这个僵尸进程才算消失。

如何避免僵尸进程??

  1. 使用wait调用来读取子进程状态,在multiprocessing.Process产出的进程可以通过子进程调用join()方法来wait。也就是在子进程调用terminate的后面调用join方法。
  2. 结束父进程。当父进程结束后,僵尸进程也随之被消除。

3. 线程

主要参考here
在python3中,通过threading模块提供线程的功能,threading模块里面有个Thread类,是模块的最主要的线程类。

Threading模块的方法和属性:

  1. active_count(): 返回当前活跃的线程数,1个主线程+n个子线程。
  2. current_thread() 返回当前线程

Thread类中的方法和属性:

  1. start(),启动线程,等待CPU调度。
  2. run(),线程被CPU调度后自动执行的方法。
  3. join([timeout]), 调用该方法将会使主线程堵塞,直到调用该方法的线程运行借宿或者超时。timeout是一个数值型参数,表示一个时间,当超过该时间之后,调用该方法的线程还未完成,那么将直接开始执行主线程。如果没有提供这个参数,那么主线程将一直堵塞到子线程结束。
  4. setDaemon() 设置为后台线程或前台线程(默认是False,前台线程)。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后,程序才停止。
  5. getName()、setName()和name 用于获取和设置线程的名称。

3.1. 创建线程的两种方法

  1. 继承Thread类,然后重写run方法。
import threading

class MyThread(threading.Thread):
    def __init__(self, thread_name):
        super().__init__(name=thread_name)

    def run(self):
        print(self.name, "线程执行中....")

if __name__ == '__main__':
    mythread = MyThread('hello')
    mythread.start()
  1. 实例化threading.Thread对象的时候,将线程要执行的任务函数传入。

threading.Thread(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None)

参数group是预留的,用于将来扩展;
参数target是一个可调用对象,在线程启动后执行;
参数name是线程的名字。默认值为“Thread-N“,N是一个数字。
参数args和kwargs分别表示调用target时的参数列表和关键字参数。
import threading

def show(name):
    print(name, "线程执行中...")

if __name__ == '__main__':
    t = threading.Thread(target=show, args=('hello',))
    t.start()

3.2. 多线程

3.2.1. 基础实现

多线程执行过程中,每个线程各执行各自的任务,不等待其他的线程。

import threading
import time

def show(name):
    print(name, "线程开始执行")
    time.sleep(3)
    print(name, "线程执行完成")


if __name__ == '__main__':
    print("主线程开始执行")
    t = threading.Thread(target=show, args=('hello',), name='jjj')
    t.start()
    time.sleep(1)
    print("主线程执行完成")
主线程开始执行
hello 线程开始执行
主线程执行完成
hello 线程执行完成

python默认情况下会等待最后一个线程执行完毕后才退出(但是多进程的时候发现不是这样的,如果不使用join方法,程序则会在主进程执行完毕后退出)。

3.2.2. 等待子线程完成

import threading
import time

def show(name):
    print(name, "线程开始执行")
    time.sleep(3)
    print(name, "线程执行完成")


if __name__ == '__main__':
    print("主线程开始执行")
    t = threading.Thread(target=show, args=('hello',), name='jjj')
    t.start()
    time.sleep(1)
    t.join()
    print("主线程执行完成")
主线程开始执行
hello 线程开始执行
hello 线程执行完成
主线程执行完成

如果在join方法中增加参数值为1,此时会在join这个地方等待线程t执行1s,子线程t还未完成,但是已经超时,所以接着执行主线程,然后在执行线程t,此时的输出结果为:

主线程开始执行
hello 线程开始执行
主线程执行完成
hello 线程执行完成

3.2.3. 设置守护线程

当将所有子线程变成主线程的守护线程的时候,当主线程结束的时候,守护线程也会随之结束,整个程序也跟着退出。

import threading
import time

def show(name):
    time.sleep(3)
    print(name, "线程执行完成")


if __name__ == '__main__':
    print("主线程开始执行")
    for i in range(3):
        t = threading.Thread(target=show, args=('hello',), name='jjj')
        t.setDaemon(True)
        # t.daemon = True    # 或者这样
        t.start()
    time.sleep(1)

    print("主线程执行完成")
    print("此时活跃的线程数目", threading.active_count())
主线程开始执行
主线程执行完成
此时活跃的线程数目 4

活跃的线程数目有4个,包括1个主线程和3个子线程。

3.3. 自定义线程类

可以自定义线程类,让它继承Thread类,然后重写run方法。

import threading

class MyThread(threading.Thread):
    def __init__(self, func, args):
        super().__init__()
        self.func = func
        self.args = args

    def run(self):
        self.func(self.args)

def myfunc(args):
    # 这个方法体里面写想让线程做的事情。
    print("hello", args)

if __name__ == '__main__':
    mythread = MyThread(myfunc, 'world')
    mythread.start()

3.4. 线程锁

由于线程之间的任务执行时CPU进行随机调度的,并且每个线程可能只执行了n条指令后就被切换到别的线程了,当多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,这被称为“线程不安全”。为了保证数据安全,设计了县城所,即同一时刻只允许一个线程操作该数据。线程锁用于锁定资源,可以同时使用多个锁,当前需要独占某个资源的时候,任何一个锁都可以锁住这个资源。

import threading
import time

number = 0

def plus():
    global number       # global声明此处的number是外面的全局变量number
    for _ in range(1000000):    # 进行一个大数级别的循环加一运算
        number += 1
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))

for i in range(2):      # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus)
    t.start()


time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)
子线程Thread-1运算结束后,number = 11747258
子线程Thread-2运算结束后,number = 12001468
主线程执行完毕后,number =  12001468

多运行几次,可以发现每次结果都不一样。正常来说,number的结果应该为2000000,但是实际结果却不是2000000,这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行数据保护的情况下,就发生了数据错误。

结果不为2000000的原因为:首先,number += 1的操作其实是分为3步的:1)从内存中取出number对应的值;2)将该值增加1;3)将新值保存到内存中。在线程1对number进行操作的时候,首先number为0,还没等线程1进行加1操作,就被切换到线程2,此时线程2从内存中读取到的number的值为0,完成加1操作后number为1.此时在切换到线程1的时候,接着执行线程1的加1操作,number变为1。从而两个线程都进行加1操作之后得到的结果为1而不是2.

可以使用join方法获取正确结果,此时线程就变成顺序执行,首先执行第一个线程对number进行修改,然后执行第二个线程对number进行修改。

for i in range(2):      
    t = threading.Thread(target=plus)
    t.start()
    t.join()   

正确的做法是使用线程锁,Python在threading中定义了几种线程锁类,分别是:

  1. Lock互斥锁
  2. RLock可重入锁
  3. Semaphore信号
  4. Event事件
  5. Condition条件
  6. Barrier’阻碍‘

3.4.1. 互斥锁Lock

互斥锁是一种独占锁,同一时刻只有一个线程可以访问共享的数据。首先,初始化锁对象,然后将锁对象传递给任务函数,在任务中加锁,使用后释放锁。

import threading
import time

number = 0
lock = threading.Lock()


def plus(lk):
    global number       # global声明此处的number是外面的全局变量number
    lk.acquire()
    for _ in range(10000000):    # 进行一个大数级别的循环加一运算
        number += 1
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
    lk.release()


for i in range(2):      # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus, args=(lock,))
    t.start()


time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)

3.4.2. RLock

在同一个线程中,RLock.acquire()可以被多次调用,利用这个特性,可以解决部分死锁问题。参考here

3.4.3. 信号Semaphore

类名为BoundedSemaphore, 这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队的人很多,工作人员允许一定数量的人进入安检区,其他人继续排队。

import threading
import time

number = 0
lock = threading.BoundedSemaphore(5)


def plus(lk, n):
    lk.acquire()
    print("run the thread %s" % n)
    time.sleep(1)
    lk.release()


for i in range(20):      # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus, args=(lock, i))
    t.start()

通过运行结果可以看出,每5个结果输出一次。

3.5. 死锁

若干子系统在系统资源竞争的时候,都在等待对面对某部分资源解除占用状态,结果谁也不愿先解锁,互相干等着,程序无法执行下去。简单而言,也就是当两个线程相互等待对方释放资源时,就会发生死锁。

一旦产生死锁,整个程序既不会发生任何异常,也不会给出任何提示,所有线程处于阻塞状态,无法继续下去。

参考here

解决死锁的常用方式:

  1. 避免多次锁定。尽量避免同一个线程对多个对象进行锁定。
  2. 具有相同的加锁顺序。如果线程对多个对象进行锁定,应该保持他们以相同的顺序请求加锁。
  3. 使用定时锁。程序在调用acquire方法加锁的时候指定timeout参数,该参数超过timeout秒后自动释放对对象的锁定,就可以解开死锁了。
  4. 死锁检测。依靠算法机制实现死锁预防机制,主要针对那些不可能实现按序加锁,也不能使用定时锁的场景的。

4. 全局解释器锁GIL

参考here1here

GIL的全称是Global Interpreter Lock,它不是Python语言的特性,来源是Python设计之初的考虑,为了数据安全所做的决定。

每个CPU在同一时间只能执行一个线程(在单核CPU下的多线程其实都只是并发,不是并行,并发和并行从宏观上来讲都是同时处理多路请求的概念。但并发和并行又有区别,并行是指两个或者多个事件同一时刻发生;而并发是指两个或者多个事件在同一时间间隔内发生。)。那么是怎么执行多线程程序的呢?原理是:解释器的分时复用,即多个线程的代码,轮流被解释器执行,只不过切换的很频繁很快,给人一种多线程“同时”在执行的错觉,也就是“并发”。

一个Python进程中,GIL只有一个,在python多线程下,每个线程的执行方式为:
1.获取GIL;2.执行代码到sleep或者python虚拟机将其挂起;3.释放GIL。

而每次释放GIL后,线程进行锁竞争、切换线程都会消耗资源。并且由于GIL存在,python里一个进程只能同时执行一个线程(拿到GIL的线程才能执行),这也是为什么在多核CPU上,python的多线程效率不高的原因。

分类讨论python多线程在两种类型代码中的运行情况:

  1. CPU密集型代码(各种循环处理,计数等)。在这种情况下,ticks计数很快就会达到阈值,然后出发GIL的释放和再竞争(多个线程来回切换需要消耗资源),所在Python下的多线程对CPU密集型代码不友好。
  2. IO密集型代码(文件处理,网络爬虫等)。多线程能够有效提升效率,因为单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。

所以,Python的多线程对IO密集型代码比较友好。

ticks可以看作是python自身的一个计数器,专门做用于GIL,在python2.x中,当ticks计数达到100的时候就会释放GIL,然后归零。这个计数可以通过sys.setcheckinterval 来调整;在python3中,GIL不使用ticks计数,改为使用计时器,当执行时间达到阈值后,当前线程释放GIL,这样对CPU密集型程序更加友好,但是依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不如人意。

为什么IO密集型代码使用python能够提升效率,参考here

多核多线程比单核多线程更差,原因是在单核多线程下,每次释放GIL后,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但是在多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但是GIL可能马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,从而造成线程颠簸,导致效率更低。

python的GIL只能保证原子操作的线程安全。当全局资源存在写操作的时候,如果不能保证写入操作过程的原子性,会出现脏读脏写数据的情况,即线程不安全。可以通过加锁来保证线程安全。

结论:多核下,想做并行提升效率,比较通用的方法是使用多进程。原因是在每个进程有自己独立的GIL,互不干扰,这样就可以真正意义上的并行执行,所以在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)。

5. 协程

参考here

5.1. 通俗理解协程和线程、进程之间的差异

进程/线程:操作系统提供的一种并发处理任务的能力。
协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内任务调度技巧。

多进程和多线程体现的是操作系统的能力,而协程体现的是程序员的流程控制能力。

三者关系:进程里面有线程,线程里面有协程。

全部评论

相关推荐

jack_miller:我给我们导员说我不在这里转正,可能没三方签了。导员说没事学校催的时候帮我想办法应付一下
点赞 评论 收藏
分享
像好涩一样好学:这公司我也拿过 基本明确周六加班 工资还凑活 另外下次镜头往上点儿
点赞 评论 收藏
分享
点赞 4 评论
分享
牛客网
牛客企业服务