1. Python并发

1.1 什么是并发?


并发类型 切换决定 处理器数量

抢先式多任务处理(threading) | 操作系统决定何时在Python外部切换任务 | 1个 | | 合作多任务处理(asyncio) | 这些任务决定何时放弃控制权 | 1个 | | 多重处理(multiprocessing) | 所有进程都同时在不同的处理器上运行 | 许多 |

1.2 并发适用场合

并发可以对两种类型的问题产生很大的影响,这两类问题通常称为CPU绑定I / O绑定

  • CPU绑定 较为常见的是文件系统和网络连接
  • I/O绑定 较为常见的是等待某些外部资源的输入/输出

在上图中,蓝色框显示了程序执行工作的时间,红色框是等待I / O操作完成所花费的时间。

I / O绑定过程 CPU绑定进程
程序大部分时间都在与速度较慢的设备(例如网络连接,硬盘驱动器或打印机)进行通信。 您的程序大部分时间都花在CPU操作上。
加快速度涉及使等待这些设备所花费的时间重叠。 加快速度需要找到在相同的时间内进行更多计算的方法。

1.3 如何加快I/O限制程序

  1. import requests
  2. import time
  3. def download_site(url, session):
  4. with session.get(url) as response:
  5. print(f"Read {len(response.content)} from {url}")
  6. def download_all_sites(sites):
  7. with requests.Session() as session:
  8. for url in sites:
  9. download_site(url, session)
  10. if __name__ == "__main__":
  11. sites = [
  12. "https://www.jython.org",
  13. "http://olympus.realpython.org/dice",
  14. ] * 80
  15. start_time = time.time()
  16. download_all_sites(sites)
  17. duration = time.time() - start_time
  18. print(f"Downloaded {len(sites)} in {duration} seconds")


  1. Downloaded 160 in 30.927271127700806 seconds


1.4 threading版本


  1. import concurrent.futures
  2. import requests
  3. import threading
  4. import time
  5. thread_local = threading.local()
  6. def get_session():
  7. if not hasattr(thread_local, "session"):
  8. thread_local.session = requests.Session()
  9. return thread_local.session
  10. def download_site(url):
  11. session = get_session()
  12. with session.get(url) as response:
  13. print(f"Read {len(response.content)} from {url}")
  14. def download_all_sites(sites):
  15. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  16. executor.map(download_site, sites)
  17. if __name__ == "__main__":
  18. sites = [
  19. "https://www.jython.org",
  20. "http://olympus.realpython.org/dice",
  21. ] * 80
  22. start_time = time.time()
  23. download_all_sites(sites)
  24. duration = time.time() - start_time
  25. print(f"Downloaded {len(sites)} in {duration} seconds")
  1. Downloaded 160 in 10.224606275558472 seconds


在改写的程序中创建了ThreadPoolExecutor,ThreadPoolExecutor= Thread+ Pool+ Executor。

  • Pool对象将创建一个线程池,每个线程可以同时运行。
  • Executor部分将控制池中每个线程的运行方式和时间。

Python标准库将ThreadPoolExecutor作为上下文管理器(context manager),因此可以使用with语法来管理创建和释放线程池。

一旦有了ThreadPoolExecutor,就可以使用map()方法。此方法在列表中的每个item上运行对应的函数。最重要的是,它使用正在管理的线程池自动并发运行它们。这里, 每个线程都需要创建自己的requests.Session()对象。



  1. import concurrent.futures
  2. counter = 0
  3. def increment_counter(fake_value):
  4. global counter
  5. for _ in range(100):
  6. counter += 1
  7. if __name__ == "__main__":
  8. fake_data = [x for x in range(5000)]
  9. counter = 0
  10. with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
  11. executor.map(increment_counter, fake_data)

此代码与您在threading上面的示例中使用的结构非常相似。不同之处在于每个线程都在访问相同的全局变量 counter并对其进行递增。Counter不受任何方式的保护,因此它不是线程安全的。

为了递增counter,每个线程都需要读取当前值,将其添加一个,然后将该值保存回该变量。发生在以下行中:counter += 1。


1.5 asyncio版本


1.5.1 asyncio原理

asyncio的基本概念是一个Python object,调用event loop,控制每个task运行的时间与方法。event loop知道每个任务并知道其处于什么状态。task则有多种状态可能,但为了学习原理,这里仅假设每个task仅有以下两个状态:

  1. Ready状态,表示任务已经准备好运行
  2. Waiting状态,表示任务处于等待某些外部事件完成的状态

简化的event loop需要维护两个任务列表,分别是Ready任务列表和Waiting任务列表。
event loop选取一个Ready列表中的任务去执行,将该任务放置到Waiting任务列表中,同时遍历Waiting任务列表中的任务,查看是否有完成响应的任务。
在Ready列表中的任务,如果没有被event loop执行到,则其状态一直保持Ready。

1.6 multiprocessing版本


  1. import requests
  2. import multiprocessing
  3. import time
  4. session = None
  5. def set_global_session():
  6. global session
  7. if not session:
  8. session = requests.Session()
  9. def download_site(url):
  10. with session.get(url) as response:
  11. name = multiprocessing.current_process().name
  12. print(f"{name}:Read {len(response.content)} from {url}")
  13. def download_all_sites(sites):
  14. with multiprocessing.Pool(initializer=set_global_session) as pool:
  15. pool.map(download_site, sites)
  16. if __name__ == "__main__":
  17. sites = [
  18. "https://www.jython.org",
  19. "http://olympus.realpython.org/dice",
  20. ] * 80
  21. start_time = time.time()
  22. download_all_sites(sites)
  23. duration = time.time() - start_time
  24. print(f"Downloaded {len(sites)} in {duration} seconds")

创建Pool的行值得您注意。 首先,它不是指定在池中创建多少个进程的方法,尽管这是一个可选参数。 默认情况下,multiprocessing.Pool()将确定计算机中的CPU数量并进行匹配。 这通常是最好的答案,就我们而言。

在Pool中的每个进程都有自己的memory space。
  1. import time
  2. def cpu_bound(number):
  3. return sum(i * i for i in range(number))
  4. def find_sums(numbers):
  5. for number in numbers:
  6. cpu_bound(number)
  7. if __name__ == "__main__":
  8. numbers = [5_000_000 + x for x in range(20)]
  9. start_time = time.time()
  10. find_sums(numbers)
  11. duration = time.time() - start_time
  12. print(f"Duration {duration} seconds")

1.7 multiprocessing模块

  1. import multiprocessing
  2. import time
  3. def cpu_bound(number):
  4. return sum(i * i for i in range(number))
  5. def find_sums(numbers):
  6. with multiprocessing.Pool() as pool:
  7. pool.map(cpu_bound, numbers)
  8. if __name__ == "__main__":
  9. numbers = [5_000_000 + x for x in range(20)]
  10. start_time = time.time()
  11. find_sums(numbers)
  12. duration = time.time() - start_time
  13. print(f"Duration {duration} seconds")


注意: CPU限制型的程序,只有通过multiprocessing可以有效地降低执行时间,threading和asyncio模块对这类问题无济于事。

2. Thread

线程是单独的执行流程。这意味着你的程序可以同时执行两个任务。但是对于大多数Python 3实现,不同的线程实际上并不会同时执行:它们看上去只是在同时执行。

在进行I/O限制型程序,例如 UDP监听,串口监听、拷贝文件等,可以利用threading.Thread,将对应的监听或等待的程序开一个线程。

  1. import logging
  2. import threading
  3. import time
  4. def thread_function(name):
  5. logging.info("Thread %s: starting", name)
  6. time.sleep(2)
  7. logging.info("Thread %s: finishing", name)
  8. if __name__ == "__main__":
  9. format = "%(asctime)s: %(message)s"
  10. logging.basicConfig(format=format, level=logging.INFO,
  11. datefmt="%H:%M:%S")
  12. logging.info("Main : before creating thread")
  13. x = threading.Thread(target=thread_function, args=(1,))
  14. logging.info("Main : before running thread")
  15. x.start()
  16. logging.info("Main : wait for the thread to finish")
  17. # x.join()
  18. logging.info("Main : all done")

daemon是在后台运行的进程。Python的threading对daemon有更明确的含义,当程序退出时,daemon thread会自动关闭。

2.2 ThreadPoolExecutor

它称为ThreadPoolExecutor,并且是concurrent.futures(自Python 3.2起)标准库的一部分。

  1. import concurrent.futures
  2. # [rest of code]
  3. if __name__ == "__main__":
  4. format = "%(asctime)s: %(message)s"
  5. logging.basicConfig(format=format, level=logging.INFO,
  6. datefmt="%H:%M:%S")
  7. with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
  8. executor.map(thread_function, range(3))


注意:使用aThreadPoolExecutor可能会导致一些令人困惑的错误。 例如,如果调用不带参数的函数,但在.map()中将参数传递给,则线程将引发异常。 不幸的是,ThreadPoolExecutor将隐藏该异常,并且(在上述情况下)该程序将终止,没有任何输出。一开始调试起来可能会很混乱

2.3 竞争条件


借助 threading.Lock()进行线程上锁,并通过with做上下文管理。

  1. class FakeDatabase:
  2. def __init__(self):
  3. self.value = 0
  4. self._lock = threading.Lock()
  5. def locked_update(self, name):
  6. logging.info("Thread %s: starting update", name)
  7. logging.debug("Thread %s about to lock", name)
  8. with self._lock:
  9. logging.debug("Thread %s has lock", name)
  10. local_copy = self.value
  11. local_copy += 1
  12. time.sleep(0.1)
  13. self.value = local_copy
  14. logging.debug("Thread %s about to release lock", name)
  15. logging.debug("Thread %s after release", name)
  16. logging.info("Thread %s: finishing update", name)


  1. from multiprocessing import Process, Queue
  2. import os, time, random
  3. #TODO: 写数据进程执行的代码
  4. def write(q):
  5. print('Process to write. {}'.format(os.getpid()))
  6. for value in ['A', 'B', 'C']:
  7. print('Put {} to queue...'.format(value))
  8. q.put(value)
  9. time.sleep(random.random())
  10. #TODO: 读数据进程执行的代码
  11. def read(q):
  12. print('Process to read. {}'.format(os.getpid()))
  13. while True:
  14. value = q.get(True)
  15. print('Get {} from queue.'.format(value))
  16. if __name__ == "__main__":
  17. # 父进程创建Queue,并传给各个子进程
  18. q = Queue()
  19. pw = Process(target=write, args=(q,))
  20. pr = Process(target=read, args=(q,))
  21. # 启动子进程
  22. pw.start()
  23. pr.start()
  24. pw.join()
  25. pr.terminate()

3. Context Manager


  1. f = open('hello.txt', 'w')
  2. try:
  3. f.write('hello, world')
  4. finally:
  5. f.close()


  1. with open('hello.txt', 'w') as f:
  2. f.write('hello, world!')


  1. some_lock = threading.Lock()
  2. # Harmful:
  3. some_lock.acquire()
  4. try:
  5. # Do something...
  6. finally:
  7. some_lock.release()
  8. # Better:
  9. with some_lock:
  10. # Do something...

3.1 写一个自己的context manager

针对context manager,写 enterexit 两个方法后的目标,可以作为context manager。

  1. class ManagedFile:
  2. def __init__(self, name):
  3. self.name = name
  4. def __enter__(self):
  5. self.file = open(self.name, 'w')
  6. return self.file
  7. def __exit__(self, exc_type, exc_val, exc_tb):
  8. if self.file:
  9. self.file.close()
  1. with ManagedFile('hello.txt') as f:
  2. f.write('hello, world!')
  3. f.write('bye now')

基于类的contexxt manger并不是唯一的方法,标准库中的 contextlib utility 模块提供了一些更加抽象的context manager protocol,便于更好地创建context manager

  1. @contextmanager
  2. def managed_file(name):
  3. try:
  4. f = open(name, 'w')
  5. yield f
  6. finally:
  7. f.close()
  8. >>> with managed_file('hello.txt') as f:
  9. ... f.write('hello, world!')
  10. ... f.write('bye now')

4. Generator


  1. L = [x * x for x in range(10)]
  2. # generator
  3. g = (x * x for x in range(10))

如果要打印g的每一个元素,可以使用 next函数逐一获取。




  1. def fib(max):
  2. n, a, b = 0, 0, 1
  3. while n < max:
  4. yield b
  5. a, b = b, a + b
  6. n = n + 1
  7. return 'done'
  8. g = fib(6)
  9. while True:
  10. try:
  11. x = next(g)
  12. print('g:', x)
  13. except StopIteration as e:
  14. print('Generator return value:', e.value)
  15. break