1. Python并发

1.1 什么是并发?

并发的字典定义是同时发生。在Python中,同时发生的事物通过不同的名称(线程,任务,进程)进行调用,但在较高层次上,还是按顺序运行的指令序列。

并发类型 切换决定 处理器数量

抢先式多任务处理(threading) | 操作系统决定何时在Python外部切换任务 | 1个 | | 合作多任务处理(asyncio) | 这些任务决定何时放弃控制权 | 1个 | | 多重处理(multiprocessing) | 所有进程都同时在不同的处理器上运行 | 许多 |

1.2 并发适用场合

并发可以对两种类型的问题产生很大的影响,这两类问题通常称为CPU绑定I / O绑定

  • CPU绑定 较为常见的是文件系统和网络连接
  • I/O绑定 较为常见的是等待某些外部资源的输入/输出

Python 并发与多线程 - 图1
在上图中,蓝色框显示了程序执行工作的时间,红色框是等待I / O操作完成所花费的时间。
该图未按比例绘制,因为Internet上的请求可能比CPU指令花费几个数量级,因此程序最终可能会花费大部分时间等待。

另一方面,有些程序类无需进行网络计算或访问文件即可进行大量计算。这些是与CPU绑定的程序,因为限制程序速度的资源是CPU,而不是网络或文件系统。
Python 并发与多线程 - 图2

I / O绑定过程 CPU绑定进程
程序大部分时间都在与速度较慢的设备(例如网络连接,硬盘驱动器或打印机)进行通信。 您的程序大部分时间都花在CPU操作上。
加快速度涉及使等待这些设备所花费的时间重叠。 加快速度需要找到在相同的时间内进行更多计算的方法。

1.3 如何加快I/O限制程序

  1. import requests
  2. import time
  3. def download_site(url, session):
  4. with session.get(url) as response:
  5. print(f"Read {len(response.content)} from {url}")
  6. def download_all_sites(sites):
  7. with requests.Session() as session:
  8. for url in sites:
  9. download_site(url, session)
  10. if __name__ == "__main__":
  11. sites = [
  12. "https://www.jython.org",
  13. "http://olympus.realpython.org/dice",
  14. ] * 80
  15. start_time = time.time()
  16. download_all_sites(sites)
  17. duration = time.time() - start_time
  18. print(f"Downloaded {len(sites)} in {duration} seconds")

download_all_sites()创建Session,然后浏览网站列表,依次下载每个网站。最后,打印出此过程花费了多长时间。

  1. Downloaded 160 in 30.927271127700806 seconds

如果你运行的程序花费时间仅数秒,并且很少运行,则可能不值得添加并发性,如果你的程序经常运行该怎么办?我们可以利用threading模块实现并发。

1.4 threading版本

利用threading模块进行改写

  1. import concurrent.futures
  2. import requests
  3. import threading
  4. import time
  5. thread_local = threading.local()
  6. def get_session():
  7. if not hasattr(thread_local, "session"):
  8. thread_local.session = requests.Session()
  9. return thread_local.session
  10. def download_site(url):
  11. session = get_session()
  12. with session.get(url) as response:
  13. print(f"Read {len(response.content)} from {url}")
  14. def download_all_sites(sites):
  15. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  16. executor.map(download_site, sites)
  17. if __name__ == "__main__":
  18. sites = [
  19. "https://www.jython.org",
  20. "http://olympus.realpython.org/dice",
  21. ] * 80
  22. start_time = time.time()
  23. download_all_sites(sites)
  24. duration = time.time() - start_time
  25. print(f"Downloaded {len(sites)} in {duration} seconds")
  1. Downloaded 160 in 10.224606275558472 seconds

本例中,当添加时threading,总体结构是相同的,只需要进行一些更改即可。download_all_sites()从每个站点调用一次功能更改为更复杂的结构。


在改写的程序中创建了ThreadPoolExecutor,ThreadPoolExecutor= Thread+ Pool+ Executor。

  • Pool对象将创建一个线程池,每个线程可以同时运行。
  • Executor部分将控制池中每个线程的运行方式和时间。

Python标准库将ThreadPoolExecutor作为上下文管理器(context manager),因此可以使用with语法来管理创建和释放线程池。

一旦有了ThreadPoolExecutor,就可以使用map()方法。此方法在列表中的每个item上运行对应的函数。最重要的是,它使用正在管理的线程池自动并发运行它们。这里, 每个线程都需要创建自己的requests.Session()对象。

因为操作系统控制着您的任务何时被中断以及另一个任务的启动时间,所以线程之间共享的任何数据都需要受到保护,或者是线程安全的。

根据数据是什么以及如何使用数据,有几种使数据访问线程安全的策略。其中之一是使用线程安全的数据结构,例如QueuePythonqueue模块中的数据。
这里使用的另一种策略是称为线程本地存储。threading.local()创建一个看起来像全局对象但特定于每个单独线程的对象。

当get_session()被调用时,session它查找特定于其正在运行的特定线程。因此,每个线程将在第一次调用时创建一个会话get_session(),然后在整个生命周期中仅在随后的每个调用中使用该会话。
这里贴一张执行的图示
Python 并发与多线程 - 图3
它使用多个线程来同时向网站发出多个打开的请求,从而使您的程序可以重叠等待时间,并更快地获得最终结果。

操作系统正在控制线程的运行时间以及何时将其换出以允许另一个线程运行。即使执行Python语句的子步骤,此线程也可以随时发生。

  1. import concurrent.futures
  2. counter = 0
  3. def increment_counter(fake_value):
  4. global counter
  5. for _ in range(100):
  6. counter += 1
  7. if __name__ == "__main__":
  8. fake_data = [x for x in range(5000)]
  9. counter = 0
  10. with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
  11. executor.map(increment_counter, fake_data)

此代码与您在threading上面的示例中使用的结构非常相似。不同之处在于每个线程都在访问相同的全局变量 counter并对其进行递增。Counter不受任何方式的保护,因此它不是线程安全的。

为了递增counter,每个线程都需要读取当前值,将其添加一个,然后将该值保存回该变量。发生在以下行中:counter += 1。

因为操作系统对您的代码一无所知,并且可以在执行过程中的任何时候交换线程,所以这种交换有可能在线程读取值之后但有机会将其写回之前发生。如果正在运行的新代码也被修改counter,则第一个线程将拥有陈旧的数据副本,随之而来的麻烦将随之而来。

1.5 asyncio版本

在写asyncio版本之前,我们先学习一个asyncio的基本工作原理。

1.5.1 asyncio原理

这里仅学习其基本原理,省略了许多细节。
asyncio的基本概念是一个Python object,调用event loop,控制每个task运行的时间与方法。event loop知道每个任务并知道其处于什么状态。task则有多种状态可能,但为了学习原理,这里仅假设每个task仅有以下两个状态:

  1. Ready状态,表示任务已经准备好运行
  2. Waiting状态,表示任务处于等待某些外部事件完成的状态

简化的event loop需要维护两个任务列表,分别是Ready任务列表和Waiting任务列表。
event loop选取一个Ready列表中的任务去执行,将该任务放置到Waiting任务列表中,同时遍历Waiting任务列表中的任务,查看是否有完成响应的任务。
在Ready列表中的任务,如果没有被event loop执行到,则其状态一直保持Ready。

1.6 multiprocessing版本

不同于前面的方法,multiprocessing版本的代码可以充分利用多个CPU。

  1. import requests
  2. import multiprocessing
  3. import time
  4. session = None
  5. def set_global_session():
  6. global session
  7. if not session:
  8. session = requests.Session()
  9. def download_site(url):
  10. with session.get(url) as response:
  11. name = multiprocessing.current_process().name
  12. print(f"{name}:Read {len(response.content)} from {url}")
  13. def download_all_sites(sites):
  14. with multiprocessing.Pool(initializer=set_global_session) as pool:
  15. pool.map(download_site, sites)
  16. if __name__ == "__main__":
  17. sites = [
  18. "https://www.jython.org",
  19. "http://olympus.realpython.org/dice",
  20. ] * 80
  21. start_time = time.time()
  22. download_all_sites(sites)
  23. duration = time.time() - start_time
  24. print(f"Downloaded {len(sites)} in {duration} seconds")

Pool会创建单独的PYthon解释器进程,对迭代目标进行执行,主进程与其他进程之间通过multiprocessing模块实现通信。
创建Pool的行值得您注意。 首先,它不是指定在池中创建多少个进程的方法,尽管这是一个可选参数。 默认情况下,multiprocessing.Pool()将确定计算机中的CPU数量并进行匹配。 这通常是最好的答案,就我们而言。

在Pool中的每个进程都有自己的memory space。
Python 并发与多线程 - 图4
对于multiprocessing而言,在编程过程中需要思考哪些变量是需要被每个process都访问到的,需要设置部分全局变量,让multiprocess帮助你实现多进程间的通信。
需要注意的是,对于I/O限制的问题,multiprocessing不一定比threading快。

如何加速CPU限制的程序

对于I/O限制的程序,受限于等待外部操作,而对于CPU限制的问题,则取决于CPU的处理能力。

  1. import time
  2. def cpu_bound(number):
  3. return sum(i * i for i in range(number))
  4. def find_sums(numbers):
  5. for number in numbers:
  6. cpu_bound(number)
  7. if __name__ == "__main__":
  8. numbers = [5_000_000 + x for x in range(20)]
  9. start_time = time.time()
  10. find_sums(numbers)
  11. duration = time.time() - start_time
  12. print(f"Duration {duration} seconds")

1.7 multiprocessing模块

  1. import multiprocessing
  2. import time
  3. def cpu_bound(number):
  4. return sum(i * i for i in range(number))
  5. def find_sums(numbers):
  6. with multiprocessing.Pool() as pool:
  7. pool.map(cpu_bound, numbers)
  8. if __name__ == "__main__":
  9. numbers = [5_000_000 + x for x in range(20)]
  10. start_time = time.time()
  11. find_sums(numbers)
  12. duration = time.time() - start_time
  13. print(f"Duration {duration} seconds")

使用multiprocessing模块会有一些缺点,如果线程之间需要通信,会使程序变得复杂。

注意: CPU限制型的程序,只有通过multiprocessing可以有效地降低执行时间,threading和asyncio模块对这类问题无济于事。

2. Thread

线程是单独的执行流程。这意味着你的程序可以同时执行两个任务。但是对于大多数Python 3实现,不同的线程实际上并不会同时执行:它们看上去只是在同时执行。

在进行I/O限制型程序,例如 UDP监听,串口监听、拷贝文件等,可以利用threading.Thread,将对应的监听或等待的程序开一个线程。

  1. import logging
  2. import threading
  3. import time
  4. def thread_function(name):
  5. logging.info("Thread %s: starting", name)
  6. time.sleep(2)
  7. logging.info("Thread %s: finishing", name)
  8. if __name__ == "__main__":
  9. format = "%(asctime)s: %(message)s"
  10. logging.basicConfig(format=format, level=logging.INFO,
  11. datefmt="%H:%M:%S")
  12. logging.info("Main : before creating thread")
  13. x = threading.Thread(target=thread_function, args=(1,))
  14. logging.info("Main : before running thread")
  15. x.start()
  16. logging.info("Main : wait for the thread to finish")
  17. # x.join()
  18. logging.info("Main : all done")

守护进程线程
daemon是在后台运行的进程。Python的threading对daemon有更明确的含义,当程序退出时,daemon thread会自动关闭。
如果一个程序运行Thread而不是daemons,那么程序会一直等待这些threads完成后才关闭。
要告诉一个线程等待另一个线程结束,请调用.join()。如果取消注释该行,则主线程将暂停并等待该线程x完成运行。

2.2 ThreadPoolExecutor

与上面看到的线程相比,有一种更简单的方法来启动一组线程。
它称为ThreadPoolExecutor,并且是concurrent.futures(自Python 3.2起)标准库的一部分。
创建它的最简单方法是作为上下文管理器,使用该with语句来管理池的创建和销毁。

  1. import concurrent.futures
  2. # [rest of code]
  3. if __name__ == "__main__":
  4. format = "%(asctime)s: %(message)s"
  5. logging.basicConfig(format=format, level=logging.INFO,
  6. datefmt="%H:%M:%S")
  7. with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
  8. executor.map(thread_function, range(3))

该代码创建了ThreadPoolExecutor一个上下文管理器,告诉它在池中需要多少个工作线程。

注意:使用aThreadPoolExecutor可能会导致一些令人困惑的错误。 例如,如果调用不带参数的函数,但在.map()中将参数传递给,则线程将引发异常。 不幸的是,ThreadPoolExecutor将隐藏该异常,并且(在上述情况下)该程序将终止,没有任何输出。一开始调试起来可能会很混乱

2.3 竞争条件

当两个或多个线程访问共享的数据或资源时,可能会发生争用情况。

借助 threading.Lock()进行线程上锁,并通过with做上下文管理。

  1. class FakeDatabase:
  2. def __init__(self):
  3. self.value = 0
  4. self._lock = threading.Lock()
  5. def locked_update(self, name):
  6. logging.info("Thread %s: starting update", name)
  7. logging.debug("Thread %s about to lock", name)
  8. with self._lock:
  9. logging.debug("Thread %s has lock", name)
  10. local_copy = self.value
  11. local_copy += 1
  12. time.sleep(0.1)
  13. self.value = local_copy
  14. logging.debug("Thread %s about to release lock", name)
  15. logging.debug("Thread %s after release", name)
  16. logging.info("Thread %s: finishing update", name)

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

  1. from multiprocessing import Process, Queue
  2. import os, time, random
  3. #TODO: 写数据进程执行的代码
  4. def write(q):
  5. print('Process to write. {}'.format(os.getpid()))
  6. for value in ['A', 'B', 'C']:
  7. print('Put {} to queue...'.format(value))
  8. q.put(value)
  9. time.sleep(random.random())
  10. #TODO: 读数据进程执行的代码
  11. def read(q):
  12. print('Process to read. {}'.format(os.getpid()))
  13. while True:
  14. value = q.get(True)
  15. print('Get {} from queue.'.format(value))
  16. if __name__ == "__main__":
  17. # 父进程创建Queue,并传给各个子进程
  18. q = Queue()
  19. pw = Process(target=write, args=(q,))
  20. pr = Process(target=read, args=(q,))
  21. # 启动子进程
  22. pw.start()
  23. pr.start()
  24. pw.join()
  25. pr.terminate()

3. Context Manager

打开文件,可以使用

  1. f = open('hello.txt', 'w')
  2. try:
  3. f.write('hello, world')
  4. finally:
  5. f.close()

当然,可以使用如下代码

  1. with open('hello.txt', 'w') as f:
  2. f.write('hello, world!')

另一个实际的例子:
涉及到线程锁时

  1. some_lock = threading.Lock()
  2. # Harmful:
  3. some_lock.acquire()
  4. try:
  5. # Do something...
  6. finally:
  7. some_lock.release()
  8. # Better:
  9. with some_lock:
  10. # Do something...

3.1 写一个自己的context manager

针对context manager,写 enterexit 两个方法后的目标,可以作为context manager。
Python会调用这两个方法在资源管理的Cycle过程中。```

  1. class ManagedFile:
  2. def __init__(self, name):
  3. self.name = name
  4. def __enter__(self):
  5. self.file = open(self.name, 'w')
  6. return self.file
  7. def __exit__(self, exc_type, exc_val, exc_tb):
  8. if self.file:
  9. self.file.close()
  1. with ManagedFile('hello.txt') as f:
  2. f.write('hello, world!')
  3. f.write('bye now')

基于类的contexxt manger并不是唯一的方法,标准库中的 contextlib utility 模块提供了一些更加抽象的context manager protocol,便于更好地创建context manager

  1. @contextmanager
  2. def managed_file(name):
  3. try:
  4. f = open(name, 'w')
  5. yield f
  6. finally:
  7. f.close()
  8. >>> with managed_file('hello.txt') as f:
  9. ... f.write('hello, world!')
  10. ... f.write('bye now')


4. Generator

通过列表生成式,我们可以直接创建一个列表。但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。
所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器:generator。
要创建一个generator,有很多种方法。第一种方法很简单,只要把一个列表生成式的[]改成(),就创建了一个generator:

  1. L = [x * x for x in range(10)]
  2. # generator
  3. g = (x * x for x in range(10))

如果要打印g的每一个元素,可以使用 next函数逐一获取。

前面提到,generator保存的是算法,每次调用next(g),就计算出g的下一个元素的值,直到计算到最后一个元素,没有更多的元素时,抛出StopIteration的错误。
可以配合for循环进行元素的查看,所以,我们创建了一个generator后,基本上永远不会调用next(),而是通过for循环来迭代它,并且不需要关心StopIteration的错误。

如果一个函数定义中包含yield关键字,那么这个函数就不再是一个普通函数,而是一个generator。

generator的函数,在每次调用next()的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。

  1. def fib(max):
  2. n, a, b = 0, 0, 1
  3. while n < max:
  4. yield b
  5. a, b = b, a + b
  6. n = n + 1
  7. return 'done'
  8. g = fib(6)
  9. while True:
  10. try:
  11. x = next(g)
  12. print('g:', x)
  13. except StopIteration as e:
  14. print('Generator return value:', e.value)
  15. break