layout: posttitle: python并发编程
subtitle: 多线程-多进程-线程池-进程池
date: 2019-10-30
author: NSX
header-img: img/post-bg-2015.jpg
catalog: true
tags:
- 多线程多进程
- python并发

  • 基础知识

    • 什么是进程/线程/二者的关系
    • 并发和并行有啥区别?同步和异步有啥区别?
    • GIL是什么?
  • 多线程

    • threading.Thread
  • 线程池

    • ThreadPoolExecutor
  • 多进程

    • multiprocessing.Process
  • 进程池

    • multiprocessing.Pool() 以及ProcessPoolExecutor
  • 异步IO

    • 协程、多线程、多进程的区别?

      导读

进程是系统进行资源分配的最小单位,线程是系统进行调度执行的最小单位;

一个应用程序至少包含一个进程,一个进程至少包含一个线程;

每个进程在执行过程中拥有独立的内存空间,而一个进程中的线程之间是共享该进程的内存空间的;

  • 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。
  • 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。编者注: 多核的CPU就像有了多个发电厂,使多工厂(多进程)实现可能。
  • 进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。
  • 一个车间里,可以有很多工人。他们协同完成一个任务。
  • 线程就好比车间里的工人。一个进程可以包括多个线程。
  • 车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存。
  • 可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。
  • 一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫”互斥锁”(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。
  • 还有些房间,可以同时容纳n个人,比如厨房。也就是说,如果人数大于n,多出来的人只能在外面等着。这好比某些内存区域,只能供给固定数目的线程使用。
  • 这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。这种做法叫做”信号量”(Semaphore),用来保证多个线程不会互相冲突。
  • 不难看出,mutex是semaphore的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为mutex较为简单,且效率高,所以在必须保证资源独占的情况下,还是采用这种设计。

一些示例来阐明线程或进程更合适用在哪:

  • 多线程使用场景:IO操作密集的场景,比如爬虫,web访问等,需要频繁从网络、硬盘、内存等读写数据。这种情况 下,因为单线程下的IO操作会有IO等待,造成不必要的时间浪费,因此采用多线程就能在线程A等待时,开启线程B的操作。

  • 多进程使用场景:CPU计算密集的场景,比如科学计算、转换或清洗大型数据集、循环处理等。这些场景因为计算工作量大,由于 GIL 加锁和释放问题,多线程相比单线程更慢

  • 请记住,由于线程的工作方式不同,它们可能比进程具有更高的内存效率。因此,在不需要时使用大量进程会导致内存膨胀。

最重要的是,请尽量避免考虑尽可能的进程和线程,并尽可能使用numpy等科学计算库并编写矢量化的操作。始终需要了解正在使用的库或框架(尤其是数值计算库和其他数据科学库)中可用的并发工具,并在适当时考虑使用它们。

1. 基础知识

什么是进程?

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

什么是线程?

线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了进程并发执行的开销,提高了操作系统的并发性能,线程没有自己的系统资源。

进程和线程的关系

  • 资源分配给进程,同一进程的所有线程共享该进程的所有资源。
  • 一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。

并发 vs. 并行

并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处理,并行则是是能够同时多任务处理。

  • 并发:是伪并行,即看起来是同时运行,实际是单个CPU+多道技术;
  • 并行:同时运行,只有具备多个CPU才能实现并行;

2019-10-30-Python并发编程(线程 vs 进程) - 图1

同步 vs. 异步

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去。

异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

GIL是什么?

  • GIL(Global Interpreter Lock)是最流程的 CPython 解释器中的一个技术术语,中文译为全局解释器锁。
  • GIL 的功能是:在 CPython 解释器中执行的每一个 Python 线程,都会先锁住自己,以阻止别的线程执行。

无论启动多少个线程,有多少个CPU,Python在执行一个进程的时候在同一时刻只允许一个线程运行。 所以,Python是无法利用多核CPU实现多线程的。因此,Python的多线程不适用于CPU密集型任务,但在IO密集型任务上能够节省时间。

为什么需要GIL ?

因为Python的线程是调用操作系统的原生线程,这个原生线程是用C语言写的。CPython启动线程的时候调用的C语言的接口。

每个线程在执行的过程中,Python解释器是控制不了的,只能等结果。如果多个线程一起执行,那结果就不一定正确了。有了GIL,就可以在同一时间只有一个线程能够工作,可以为了避免出错。

需要注意的是,GIL并不是Python的特性,Python完全可以不依赖于GIL,很多Python解释器是没有GIL的。

2. 多线程

Python中实现多线程的并发需要使用threading模块。

线程对象的创建

  • Thread类直接创建
  1. import threading
  2. if __name__ == '__main__':
  3. #初始化线程
  4. t1 = threading.Thread(target=task_thread, args=(3,))
  5. #开启线程
  6. t1.start()
  7. #等待运行结束
  8. t1.join()
  • Thread类继承式创建,重写run方法

重写 Thread 类 run() 方法来实现逻辑,这个方法是线程的入口。线程被创建之后并不会马上运行,需要手动调用 start()join() 让调用它的线程一直等待直到执行结束(即阻塞调用它的主线程, t 线程执行结束,主线程才会继续执行)

  1. import threading
  2. class MyThreading(threading.Thread):
  3. def __init__(self,func,arg):
  4. super(MyThreading,self).__init__()
  5. self.func = func
  6. self.arg = arg
  7. def run(self):
  8. self.func(self.arg)
  9. print('%s say hello' % self.name)
  10. def f1(args):
  11. print(args)
  12. if __name__ == '__main__':
  13. obj = MyThreading(f1, 123)
  14. obj.start()
  15. print('主线程')

线程对象的实例方法和属性

  • t.start():激活线程,线程被CPU调度后会自动执行t.run(),run方法不要手动调用

  • t.join():父线程阻塞直到该子线程执行结束

  • t.daemon:声明为守护线程,需在start方法前调用,默认为False。值为True时主进程结束后子线程一起结束,为False时主进程会等待子线程结束后才退出

  • t.is_alive():判断线程是否为激活状态

  • t.getName():获取线程的名称

  • t.setName():设置线程的名称

线程锁Lock

加锁是为了对锁内资源(变量)进行锁定,避免其他线程篡改已被锁定的资源,以达到我们预期的效果

CPU执行任务时,在线程之间是进行随机调度的,并且每个线程可能只执行n条代码后就转而执行另外一条线程。由于在一个进程中的多个线程之间是共享资源和数据的,这就容易造成资源抢夺或脏数据,于是就有了锁的概念,限制某一时刻只有一个线程能访问某个指定的数据。锁通常被用来实现对共享资源的同步访问。

python在 threading 模块中定义了几种线程锁类,分别是:

  • Lock 普通锁(不可嵌套)
  • RLock 普通锁(可嵌套)
  • Semaphore 信号量
  • event 事件
  • condition 条件

代码示例

为每一个共享资源创建一个 Lock 对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁。未加锁部分并发执行,加锁部分串行执行,举例说明:

  1. import threading
  2. import time
  3. num = 100
  4. def fun_sub():
  5. #未加锁的代码并发运行
  6. time.sleep(3)
  7. global num
  8. print('现在操作共享资源的线程名字是:',t.name)
  9. #加锁的代码串行运行
  10. lock.acquire()
  11. num2 = num
  12. time.sleep(0.001)
  13. num = num2-1
  14. lock.release()
  15. if __name__ == '__main__':
  16. print('开始测试同步锁 at %s' % time.ctime())
  17. lock = threading.Lock() #创建一把同步锁
  18. thread_list = []
  19. for thread in range(100):
  20. t = threading.Thread(target=fun_sub)
  21. t.start()
  22. thread_list.append(t)
  23. for t in thread_list:
  24. t.join()
  25. print('num is %d' % num) # 0
  26. print('结束测试同步锁 at %s' % time.ctime())

注意的是,lock.acquire()lock.release() 必须成对出现。否则就有可能造成死锁!

很多时候,我们虽然知道,他们必须成对出现,但是还是难免会有忘记的时候。为了规避这个问题。我推荐使用使用上下文管理器 with来加锁。with 语句更加优雅,也更不容易出错,会在这个代码块执行前自动获取锁,在执行结束后自动释放锁。

  1. import threading
  2. lock = threading.Lock()
  3. with lock:
  4. # 这里写自己的代码
  5. pass

GIL VS Lock

2019-10-30-Python并发编程(线程 vs 进程) - 图2

  • 锁(Lock)是指一个进程对内存资源进行排他占用的状态;

  • 死锁(Deadlock)两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁;

两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为 死锁进程.

线程间通信

线程的一个关键特性是每个线程都是独立运行且状态不可预测。在某些场景下,需要进行线程间的通信。

threading.Event():包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为False。如果有线程等待一个Event对象,那么这个线程将会被一直阻塞直至该标志为True。

threading.Semaphore():可用于设置最大并发数。Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1,调用release() 时内置计数器+1,计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

threading.Condition():一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。

代码示例-略

  1. 先说不使用Queue获取多线程的返回值的方法(参考博客
  2. 使用Queue获取多线程的返回值的方法

3. 线程池

参考:《Python线程池 ThreadPoolExecutor 的用法及实战》 python 多线程与线程池

线程池中的线程可以得到重用,避免了频繁的创建新的线程,同时线程数量也是可控的。

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,该模块提供异步执行可调用对象高层接口,是Python的原生模块,提供了线程池 ThreadPoolExecutor 和进程池 ProcessPoolExecutor 的实现。

ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):Executor 子类使用最多 max_workers 个线程的线程池来异步执行调用(默认值是 CPU 个数的 5 倍!)。以下是它的方法:

  • submit(fn, *args, **kwargs):调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对象。

  • map(func, *iterables, timeout=None, chunksize=1):类似于 map(func, *iterables)

  • shutdown(wait=True):当待执行的期程完成执行后向执行者发送信号,它就会释放正在使用的任何资源。

优点:

  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。

线程池的基本使用

  1. # coding: utf-8
  2. from concurrent.futures import ThreadPoolExecutor
  3. import time
  4. def spider(page):
  5. time.sleep(page)
  6. print(f"crawl task{page} finished")
  7. return page
  8. with ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池
  9. task1 = t.submit(spider, 1)
  10. task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中
  11. task3 = t.submit(spider, 3)
  12. print(f"task1: {task1.done()}")
  13. print(f"task2: {task2.done()}")
  14. print(f"task3: {task3.done()}")
  15. time.sleep(2.5)
  16. print(f"task1: {task1.done()}")
  17. print(f"task2: {task2.done()}")
  18. print(f"task3: {task3.done()}")
  19. try:
  20. def get_result(future):
  21. print(future.result()) # 通过result来获取返回值
  22. # 为task添加线程完成的回调函数
  23. task1.add_done_callback(get_result)
  24. task2.add_done_callback(get_result)
  25. task3.add_done_callback(get_result)
  26. except Exception as e:
  27. print('raise an exception: {}'.format(e))

代码讲解:

  1. 使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。
  2. 使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。
  3. 通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。
  4. 使用 result() 方法可以获取任务的返回值。
  5. 使用 add_done_callback() 方法来获取线程任务的返回值

线程池的主要方法

map

concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

  • map可以保证输出的顺序, submit输出的顺序是乱的
  • 如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
  • submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。
  1. with ProcessPoolExecutor(max_workers=3) as executor:
  2. for result in executor.map(func, [2, 3, 1, 4]): # 返回线程执行的结果
  3. print(result)

wait

  1. from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
  2. import time
  3. def spider(page):
  4. time.sleep(page)
  5. print(f"crawl task{page} finished")
  6. return page
  7. with ThreadPoolExecutor(max_workers=5) as t:
  8. all_task = [t.submit(spider, page) for page in range(1, 5)]
  9. wait(all_task, return_when=FIRST_COMPLETED)
  10. print('finished')
  11. print(wait(all_task, timeout=2.5))
  1. 代码中返回的条件是:当完成第一个任务的时候,就停止等待,继续主线程任务
  2. 由于设置了延时, 可以看到最后只有 task4 还在运行中

√as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。

ThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果。用法如下:

  1. # coding: utf-8
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. import time
  4. def spider(page):
  5. time.sleep(page)
  6. print(f"crawl task{page} finished")
  7. return page
  8. def main():
  9. with ThreadPoolExecutor(max_workers=20) as t:
  10. obj_list = []
  11. for page in range(1, 5):
  12. obj = t.submit(spider, page)
  13. obj_list.append(obj)
  14. for future in as_completed(obj_list):
  15. data = future.result()
  16. print(f"main: {data}")
  17. # 执行结果
  18. crawl task1 finished
  19. main: 1
  20. crawl task2 finished
  21. main: 2
  22. crawl task3 finished
  23. main: 3
  24. crawl task4 finished
  25. main: 4

as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。

当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时,先完成的任务会先返回给主线程。

4. 多进程

multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免GIL带来的影响。

4.1 进程对象的创建

Process类直接创建

Process类继承式创建,重写run方法

4.2 进程对象的实例方法和属性

p.start():启动进程

p.join([timeout]):父进程阻塞直到该子进程执行结束

p.is_alive():判断进程是否为激活状态

p.terminate/kill():终止进程

p.daemon:声明为守护进程,需在start方法前调用,默认为False。值为True时进程结束后子进程一起结束,为False时主进程会等待子进程结束后才退出

p.pid:返回进程ID

p.exitcode:返回进程退出代码

  1. import multiprocessing as mp
  2. p = mp.Process(target=task, args=(i,))
  3. p.start()
  4. p.join()

4.3 multiprocessing模块提供的方法

multiprocessing.current_process():返回与当前进程相对应的 Process 对象

multiprocessing.active_children():返回当前进程存活的子进程的列表

multiprocessing.set_executable():设置在启动子进程时使用的 Python 解释器路径

4.4 进程间通信

  • multiprocessing.Event():发送事件信号,与threading.Event()类似。

  • multiprocessing.Semaphore():可控制最大并发数,类似于threading.Semaphore。

  • multiprocessiong.Queue():主要用来在多个进程之间实现通信。Queue是多进程安全队列,Queue通过put和get方法来实现多进程之间的数据传递。

  • multiprocessiong.Pipe():Pipe常用来在两个进程之间实现通信。该方法返回一个二元元组 (conn1,conn2),代表一个管道的两端。Pipe方法有个duplex参数,默认为True,表示该管道处于全双工模式下,conn1和conn2都可以进行收发。当其为False时,表示该管道处于半双工模式下,conn1只能进行接收消息,conn2只能发送消息。send和recv方法分别是发送和接收消息的方法。

  • 共享内存:multiprocessing.Value(),multiprocessing.Array()。可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

  • multiprocessing.Manager():数据管理器,控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

5. 进程池

进程池可以减少进程创建和释放的开销。multiprocessing.Pool() 描述了一个工作进程池,常用方法有:

  • apply(func[, args[, kwds]]):使用arg和kwds参数调用func函数,结果返回前会一直阻塞,且func函数仅被pool中的一个进程运行。

  • apply_async(func[,args[,kwds[,callback[,error_callback]]]]):apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 callback应被立即完成,否则处理结果的线程会被阻塞。

  • close():阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate():不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join():wait工作线程的退出,在调用join()前,必须调用close()或terminate()。

  • map(func, iterable[, chunksize])

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])

除此之外,还可以使用concurrent.futures.ProcessPoolExecutor(),接口与ThreadPoolExecutor线程池相同。

ProcessPoolExecutor的基本使用

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和 ProcessPoolExecutor (进程池)两个类,是对 threading 和 multiprocessing 的进行了高级别的抽象, 暴露出统一的接口,帮助开发者非常方便的实现异步调用。

  1. # coding: utf-8
  2. from concurrent.futures import ProcessPoolExecutor
  3. import time
  4. def spider(page):
  5. time.sleep(page)
  6. print(f"crawl task{page} finished")
  7. return page
  8. with ProcessPoolExecutor() as p: #不填则默认为cpu的个数
  9. task1 = p.submit(spider, 1)
  10. task2 = p.submit(spider, 2) # 通过submit提交执行的函数到线程池中
  11. task3 = p.submit(spider, 3)
  12. try:
  13. def get_result(future):
  14. print(future.result()) # 通过result来获取返回值
  15. # 为task添加线程完成的回调函数
  16. task1.add_done_callback(get_result)
  17. task2.add_done_callback(get_result)
  18. task3.add_done_callback(get_result)
  19. except Exception as e:
  20. print('raise an exception: {}'.format(e))

6. 异步IO

参考资料:

《第1章:I/O Models 阻塞/非阻塞 同步/异步》

《第2章:Python 并发编程》

《第8章:使用 asyncio 模块实现并发》

《第9章:使用 asyncio + aiohttp 并发下载》

首先我们看一下多进程、多线程、异步 IO 这三者的区别,以请求一个网页为例,如下图所示,把这类任务抽象成3部分,绿色部分代表请求前以及请求前的相关运算处理工作;白色部分代表请求中,等待远程服务器返回结果;蓝色部分代表得到服务器返回结果,并进行相关处理。

协程、多线程、多进程的区别?

2019-10-30-Python并发编程(线程 vs 进程) - 图3

  • 单线程的网络IO部分,是会阻塞程序的运行;发出请求到请求返回的这段时间,该程序会占着CPU的该线程的坑位。(注:所谓的单线程就是一个进程只开一个线程)

如上所示,单线程的网络IO部分,是会阻塞程序的运行;发出请求到请求返回的这段时间,该程序会占着CPU的该线程的坑位。(注:所谓的单线程就是一个进程只开一个线程)

  • 多进程

    • 抢占式多任务(preemptive),同步并行的,由操作系统调度
    • 多进程是三种并发模式中唯一可以使用多核 CPU 的模式
  • 多线程

    • 抢占式多任务(preemptive),同步并行的,由操作系统调度
    • Python 默认的解释器 CPython 由于 GIL 的存在,不能使用多核 CPU,只能运行在一个核心上
    • 线程是不安全的,线程会是挂的,经常你争我夺,当然会没有那么稳定;
    • 每个线程也具有独立的资源,比如栈、寄存器等;
    • 另外线程之间的切换也会造成额外资源的开销
  • 协程★★

    • 协程,又称为微线程,英文名为Coroutine它的本质还是一个单线程,在单线程内实现调度,而不是在CPU层面切换线程,从而避免不必要的开销。
    • 异步是指在 单线程并发 执行多个任务。当一个任务在等待数据时,它会释放 CPU 资源,转而执行其它任务,通过程序员自己主动切换任务来最小化空闲时间(IO等待的时间),这就是所谓的异步非阻塞

异步 I/O 操作

多进程多线程 方案中,操作系统不可能无上限地增加进程或线程,一方面会占用大量内存,影响系统稳定性;另一方面 上下文切换 的开销也很大,一旦进程或线程的数量过多时,CPU 的大部分时间就花在 上下文切换 上了,真正运行代码的时间就少了,结果是导致性能严重下降。

那么,有没有什么办法可以减少大量进程或者线程的创建产生的大量内存占用?其实是有的,就是利用所谓的线程池或者进程池;既然减少了创建和销毁对象产生的开销,那么进程或者线程切换的开销有没有办法减少呢?其实是有的,我们直接使用 异步 IO 就可以了。

异步 I/O 框架中,使用 单线程,利用 事件循环,不断地重复 “监控到事件发生 —> 处理事件” 这一过程。同时,还要把每个 阻塞型操作(blocking operation ) 替换成 非阻塞的异步调用(non-blocking asynchronous call),当某个任务中遇到耗时的 I/O 操作时,才会把控制权 交还事件循环,然后 事件循环 会执行另一个任务。这样就可以避免阻塞型调用中止整个应用程序的进程,合理地解决了 CPU 高速执行能力和 I/O 设备的龟速严重不匹配问题。

异步 I/O 操作 是指,你发起一个 I/O 操作(比如,等待网络图片数据的到来),却不用等它结束,你可以继续去做其它的事情,当它结束时,你会得到通知,然后再回来接着处理这个 I/O 后续的操作。而 同步 I/O 操作 则会被阻塞在 I/O 操作上直到它完成,这期间 CPU 做了很多事,只是没有运行你的程序

Python 的异步 IO 相关模块非常多

  • 比如 aiohttp、gevent……这里主要介绍 aiohttp 模块

参考