【python异步编程系列4】基于python3.5+的异步编程

【python异步编程系列4】基于python3.5+的异步编程 - 图1

这篇文章是Python异步编程系列文章中的一篇

整个系列如下:

在这篇文章中我们将讨论到目前为止我们谈到的Python栈的概念:从最简单的线程、进程,到asyncio库。

异步编程最近在Python中越来越流行。在Python中有很多做异步编程的库。其中一个库asyncio已经被收录到了Python3.4的官方库中。在Python3.5中我们有了async/await语法。异步编程在Python中越来越流行的部分原因就是Asyncio。这篇文章将解释什么是异步编程,并且对其中的一些库进行比较。


导读

我们从之前的文章中所讲的了解到:

  • 同步: 阻塞操作。

  • 异步: 非阻塞操作。

  • 并发: 使程序同时被运行

  • 并行: 使程序并行被执行

  • 并行意味着并发,但并发并不一定意味着并行


Python现在主要运行方式有两种,同步和异步。你需要把他们当成两个分开的方式,他们的库和调用方式不一样,但是变量和语法是相同的。

Python的同步方式已经存在了几十年,你直接调用函数,所有的程序都按照你写的代码按顺序处理。这里有几种方式可以并行的运行代码。

Synchronous world

同步的世界

在这篇文章中我们将比较同样代码的不同实现方式。我们会尝试执行两个函数,第一个是对数字进行幂运算:

  1. def cpu_bound(a, b):
  2. return a ** b

我们将对这个方法调用N次:

  1. def simple_1(N, a, b):
  2. for i in range(N):
  3. cpu_bound(a, b)

第二个是在网上下载数据:

  1. def io_bound(urls):
  2. data = []
  3. for url in urls:
  4. data.append(urlopen(url).read())
  5. return data
  6. def simple_2(N, urls):
  7. for i in range(N):
  8. io_bound(urls)

为了比较函数的运行时间,我们实现了简单的装饰器/上下文管理器:

  1. import time
  2. from contextlib import ContextDecorator
  3. class timeit(object):
  4. def __call__(self, f):
  5. @functools.wraps(f)
  6. def decorated(*args, **kwds):
  7. with self:
  8. return f(*args, **kwds)
  9. return decorated
  10. def __enter__(self):
  11. self.start_time = time.time()
  12. def __exit__(self, *args, **kw):
  13. elapsed = time.time() - self.start_time
  14. print("{:.3} sec".format(elapsed))

现在让我们将所有代码放在一起运行一下,来看看你的机器执行这些代码需要多长时间:

  1. import time
  2. import functools
  3. from urllib.request import urlopen
  4. from contextlib import ContextDecorator
  5. class timeit(object):
  6. def __call__(self, f):
  7. @functools.wraps(f)
  8. def decorated(*args, **kwds):
  9. with self:
  10. return f(*args, **kwds)
  11. return decorated
  12. def __enter__(self):
  13. self.start_time = time.time()
  14. def __exit__(self, *args, **kw):
  15. elapsed = time.time() - self.start_time
  16. print("{:.3} sec".format(elapsed))
  17. def cpu_bound(a, b):
  18. return a ** b
  19. def io_bound(urls):
  20. data = []
  21. for url in urls:
  22. data.append(urlopen(url).read())
  23. return data
  24. @timeit()
  25. def simple_1(N, a, b):
  26. for i in range(N):
  27. cpu_bound(a, b)
  28. @timeit()
  29. def simple_2(N, urls):
  30. for i in range(N):
  31. io_bound(urls)
  32. if __name__ == '__main__':
  33. a = 7777
  34. b = 200000
  35. urls = [
  36. "http://google.com",
  37. "http://yahoo.com",
  38. "http://linkedin.com",
  39. "http://facebook.com"
  40. ]
  41. simple_1(10, a, b)
  42. simple_2(10, urls)

我们将这些函数同时运行了N次。

在我的硬件条件上,计算密集型函数花了2.18秒,I/O密集型用了31.4秒。

所以,我们得到了基本的性能标准。我们现在来了解一下线程。

Threads

线程

【python异步编程系列4】基于python3.5+的异步编程 - 图2

线程是操作系统中能够进行运算调度的最小单元。

一个进程中的线程可以共享内存中的全局变量。如果全局变量在一个线程中被修改,这个修改在所有线程中都有效。

简单来说,线程是在程序中一系列操作,可以独立于其他代码运行。

线程是并发执行的,也可以并行执行,这就依赖于在什么系统上运行。

Python线程是通过系统线程实现的,我知道的Python线程实现的解释器(CPython, PyPy和Jython)。对于每个Python线程,都有一个系统线程。

每单位时间在一个处理器核心上执行一个线程。它一直工作,直到分给它的时间用完(默认为100毫秒),或直到因为系统调用放弃对下一个线程的控制。

我们试着通过线程来实现示例:

  1. from threading import Thread
  2. @timeit()
  3. def threaded(n_threads, func, *args):
  4. jobs = []
  5. for i in range(n_threads):
  6. thread = Thread(target=func, args=args)
  7. jobs.append(thread)
  8. # start the threads
  9. for j in jobs:
  10. j.start()
  11. # ensure all of the threads have finished
  12. for j in jobs:
  13. j.join()
  14. if __name__ == '__main__':
  15. ...
  16. threaded(10, cpu_bound, a, b)
  17. threaded(10, io_bound, urls)

在我的硬件上,计算密集型函数花了2.47秒,I/O密集型用了7.9秒。

I/O密集型函数执行时间快了5倍,因为我们在不同的线程上并行下载数据。但是为什么计算密集型运行更慢了呢?

这里实现Python的解析器用的是CPython,它有一个臭名昭著的GIL(Global Interpreter Lock)。我们先来了解一下GIL。。。

全局解释器锁(GIL)

首先,GIL是一个锁,在访问Python之前必须带GIL(且不仅仅是执行Python代码,调用Python API时也是)。本质上,GIL是一个全局信号量,GIL不允许多个线程在一个解释器中同时工作。

严格地说,在使用未捕获的GIL运行解释器之后,唯一可用的调用是它的捕获。违反规则会导致即时崩溃(最好的情况)或程序延迟崩溃(更糟糕且更难调试)。

它是怎么工作的呢?

当线程开启时,会执行一个GIL捕获。一段时间后,进程调度程序认为当前线程已经做完了,然后将控制权给下一个线程。线程#2看到GIL已经被捕获就不会继续工作,而是进入休眠,把进程让给线程#1.

但是线程不能一直占着GIL。在Python 3.3之前,每100个机器代码指令GIL切换一次。在后面的GIL版本,一个线程不能超过5毫秒。如果线程进行系统调用,或使用磁盘或网络(I​​/O绑定操作),也会释放GIL。

事实上,Python中通过使用线程解决计算机(CPU-绑定操作)并行的问题的想法,因为 GIL 的存在而失效了。即使在多线程系统也是同步运行。在计算密集型任务中,程序运行不会加快,反而会变慢,因为线程减半了进程的时间,同时,I/O操作不会变慢,因为系统调用线程释放了GIL。

很明显,GIL让我们的程序执行速度变慢的原因是,由于创建和传递线程,捕获和释放信号以及保留上下文这些额外的工作。但是需要提到的是,GIL名没有限制并行执行。

GIL不是一种Python语言的一部分,不是每种实现都存在该问题。只是在 CPython 中被用到。

所以它为什么会存在呢?

GIL保护数据结构操作免受并发访问问题的影响。比如,当对象的引用计算的数值发生改变是,它可以防止竞争条件。GIL使得可以很容易地在C上集成非线程安全库。感谢有GIL,我们可以有那么多的快速模块和绑定器。

也有例外,GIL控制器可以直接用于C库。例如,NumPy在整型操作中可以释放GIL。或者,当使用numba包时,程序可以控制关掉信号。

在这段悲伤的说明中,你可以得出结论是,线程适用于在I/O密集型中处理并行任务。但是计算任务应该被放在不同的进程中执行。

进程

从系统角度来看,进程是一个数据结构有一个内存空间和一些资源,比如,需要一个进程打开文件。通常一个进程有一个线程,称为主线程,不过程序可以创建任意数目的线程。在一开始,线程是不可分配的独立资源,它会用进程的内存和资源。因为这个原因,线程可以很快的开始和结束。

多任务由调度程序控制,调度程序是操作系统内核之一,会将执行线程加载到中央处理器中。

像线程一样,进程通常也是并发执行,且也可以并行,这取决于当前的硬件条件。

  1. from multiprocessing import Process
  2. @timeit()
  3. def multiprocessed(n_threads, func, *args):
  4. processes = []
  5. for i in range(n_threads):
  6. p = Process(target=func, args=args)
  7. processes.append(p)
  8. # start the processes
  9. for p in processes:
  10. p.start()
  11. # ensure all processes have finished execution
  12. for p in processes:
  13. p.join()
  14. if __name__ == '__main__':
  15. ...
  16. multiprocessed(10, cpu_bound, a, b)
  17. multiprocessed(10, io_bound, urls)

在我的硬件中,计算密集型花费1.12秒,I/O密集型花费7.22秒。

因此,计算操作比线程实现执行得更快,因为现在我们并没有停留在捕获GIL,但是I/O密集型函数花费的时间更多,因为进程比线程更重。

Asynchronous world

异步的世界

【python异步编程系列4】基于python3.5+的异步编程 - 图3

在异步的世界,所有的事情都发生了一些改变。所有的任务都在一个中央事件处理环中,事件循环是一段很小的代码,它允许你同时做几个协程(这是一个重要的术语,简单的来说这不是系统管理线程,它们是多任务合作,但并不是真正的并发)。协程工作是同步运行的,直到得到期望的结果,然后工作结束,控制权转给了事件循环,接着再做其他的事。

Green threads

绿色线程

【python异步编程系列4】基于python3.5+的异步编程 - 图4

绿色线程是异步编程的基础层。绿色线程与常规线程一样,除了它的线程切换实现是在应用程序中(用户层),而不是处理器(操作系统层)。它的核心是非阻塞操作。线程之间的切换仅在I/O上发生。非I/O线程将一直享有控制权。

Gevent是一个目前非常广泛的绿色线程以及非阻塞的Python包。gevent.monkey方法改变了标准Python包,使得其可以允许非阻塞操作。

Other libraries:

其他的库:

让我们来看看在Python中通过使用gevent来使用绿色线程的效果是怎么样的吧:

  1. import gevent.monkey
  2. # patch any other imported module that has a blocking code in it
  3. # to make it asynchronous.
  4. gevent.monkey.patch_all()
  5. @timeit()
  6. def green_threaded(n_threads, func, *args):
  7. jobs = []
  8. for i in range(n_threads):
  9. jobs.append(gevent.spawn(func, *args))
  10. # ensure all jobs have finished execution
  11. gevent.wait(jobs)
  12. if __name__ == '__main__:
  13. ...
  14. green_threaded(10, cpu_bound, a, b)
  15. green_threaded(10, io_bound, urls)

结果:计算密集型 - 2.23秒,I/O密集型 - 6.85秒

如期望中的一样,计算密集型更慢了,I/O密集型更快了。

Asyncio

Asyncio

Asyncio在Python中是用来编写并行代码的库。然而,Asyncio并不是多线程和多进程的。它不是为了多线程和多进程而设计的。

Gevent和Twisted的目标是作为高层级的框架。而Asyncio的目的是异步事件循环的低层级实现,从而让其他高层级框架(Twisted, Gevent和Tornado等)可以基于它而构建。 然而,就Asyncio本身而言,它就是一个很好的框架。

事实上,Asyncio是一个处理多任务的单线程和单进程项目。Asyncio可以让我们在同一个线程上运行异步并发程序,通过事件循环来处理任务,通过sockets(或其他的资源)来多路复用I/O.

Asyncio提供很多有用的API,包括事件循环。事件循环追踪不同的I/O事件,让那些已经准备好的任务开始执行,而将还在I/O上等待的任务暂停。

How it works

怎么工作的呢?

同步和异步的方法或调用是不同的,你能不将他们混为一谈。如果你要阻塞了一个同步的协程,你要用time.sleep(10)而不是await asyncio.sleep(10),你不需要将控制返回给事件循环,全部的进程都会被阻塞。

你要考虑你的代码既有同步代码也有异步代码,任何在异步方法中的代码都是异步的,其他(包括Python文件或者类的主函数)是同步代码。

【python异步编程系列4】基于python3.5+的异步编程 - 图5

这样想比较简单。这有一个事件循环。然后我们将Python中的一个异步方法(协程)声明为一个async def,函数的调用方式就发生了改变。特别是,调用这个函数会很快返回一个协程对象,它的意思是说“我可以运行协程,当你await的时候返回一个结果”。

我们将这些方法交给事件循环,让它帮我们去运行这些函数。事件循环会给我们返回一个Future对象,这就像一个保证告诉我们未来会把结果返回给我们。我们拿着这个保证,一次一次的去查看它有没有返回的结果(当我们失去耐心时),终于返回值来了,我们可以用返回的结果再进行其他的操作。

当你调用await时,当前方法执行会暂停下来,等待你要等的东西,当等待结束后,事件循环将再次唤醒函数从await调用中恢复,并将await到的结果返回。

Example: 示例:

  1. import asyncio
  2. async def say(what, when):
  3. await asyncio.sleep(when)
  4. print(what)
  5. loop = asyncio.get_event_loop()
  6. loop.run_until_complete(say('hello world', 1))
  7. loop.close()

在这个例子中,say()方法暂停运行并把控制交给事件循环,事件循环看到要运行sleep方法并调用它,然后整个调用进入等待,并且标记在一秒钟后恢复。一旦恢复,会通知事件循环完成并返回结果,然后让主程序准备继续运行,事件循环恢复了主程序并返回结果。

这就是异步代码为什么可以同时发生那么多事情的原因——任何方法阻塞当前调用使其进入等待,然后被放入事件循环的暂停协程列表,这样其他的程序就能够运行。每个被暂停的程序都有一个相关的回调,用于未来将它唤醒,而这些程序被暂停有的是因为运行时间,有些是因为I/O操作,且大多数都像上面的例子一样,需要等待另一个协程的结果。

让我们回到刚刚的例子。我们有两个阻塞的函数,计算密集型和I/O密集型。就像我说的那样,我们不能把同步和异步操作混在一起用——而是必须把所有操作都变成异步的。当然,不是所有东西都有异步库。有些代码仍然是阻塞,必须以某种方式让它运行,不然会阻塞事件循环。这个问题的解决方法是,使用事件循环对象的 run_in_executor()方法,它会在线程池中新建一个线程来运行,而不会阻塞事件循环中的主线程。我们将在计算密集型方法中使用这个功能。我们也会完全重写I/O密集型方法,使其await那些我们需要等待的事件。

  1. import asyncio
  2. import aiohttp
  3. async def async_func(N, func, *args):
  4. coros = [func(*args) for _ in range(N)]
  5. # run awaitable objects concurrently
  6. await asyncio.gather(*coros)
  7. async def a_cpu_bound(a, b):
  8. result = await loop.run_in_executor(None, cpu_bound, a, b)
  9. return result
  10. async def a_io_bound(urls):
  11. # create a coroutine function where we will download from individual url
  12. async def download_coroutine(session, url):
  13. async with session.get(url, timeout=10) as response:
  14. await response.text()
  15. # set an aiohttp session and download all our urls
  16. async with aiohttp.ClientSession(loop=loop) as session:
  17. for url in urls:
  18. await download_coroutine(session, url)
  19. if __name__ == '__main__':
  20. ...
  21. loop = asyncio.get_event_loop()
  22. with timeit():
  23. loop.run_until_complete(async_func(10, a_cpu_bound, a, b))
  24. with timeit():
  25. loop.run_until_complete(async_func(10, a_io_bound, urls))

结果:计算密集型 - 2.23秒,I/O密集型 - 4.37秒。

计算密集型慢了,而I/O密集型比线程的例子快了近两倍。

Making the Right Choice

做一个正确的选择

  • CPU-bound -> multiprocessing
  • I/O-bound, fast I/O, Limited Number of Connections -> multithreading
  • I/O-bound, slow I/O, many connections -> asyncio

  • 计算密集型 -> 多进程

  • I/O密集型,I/O比较快,连接数有限制 -> 多线程
  • I/O密集型,I/O比较慢,连接数比较多 -> 异步

Conclusion

结论

如果是一个不依赖于外部服务的典型的Web应用,并且相关的有限数量的用户的响应时间可以预见是比较短的,那么使用线程会比较容易。

异步适用于需要花大部分时间进行数据读写而不是处理的操作的应用,比如,你有大量慢请求 - websockets,长轮询,或者是一些比较慢的外部后端同步操作,那些不知道什么时候结束的请求。

同步编程是那些连续执行的命令的应用的开发中最常用的。

即使在条件分支,循环,和函数调用中,我们也会考虑一次执行一个步骤,完成当前步骤后再进入下一步。

异步应用的表现是不一样的。它仍然是一次执行一个步骤,不同的地方在于系统会继续向前运行,不会等待当前执行结束。结果就是,我们将进行事件驱动的编程

asyncio是一个很好用的库,已经被放进Python的标准库里了。asyncio已经开始为应用开发建造一个系统了(aiohttp,asyncpg,等)。这里也有一些其他的事件循环的实现(uvloopdabeaz/curiopython-trio/trio)。我认为asyncio将会发展成一个很厉害的工具。

Links

链接