Gevent 中文入门教程

核心

Greenlets

gevent中使用的主要模式是 Greenlet,这是作为C扩展模块提供给 Python 的一个轻量级协同程序。所有 greenlet 都在主程序的 OS 进程中运行,但它们是协同调度的。

在任何给定的时间内,只有一个greenlet在运行。

这不同于由 multiprocessing 或 threading 库提供的并行结构,它们这些库可以自旋进程和POSIX线程,由操作系统调度,并且真正并行的。

异步和同步执行

并发性的核心思想是,可以将较大的任务分解为多个子任务的集合,这些子任务计划同时或异步运行,而不是一次或同步运行。两个子任务之间的切换称为上下文切换。

gevent中的上下文切换是通过 yielding 来完成的。在本例中,我们有两个上下文,它们通过调用 gevent.sleep(0) 相互让步。

  1. import gevent
  2. def foo():
  3. print('Running in foo')
  4. gevent.sleep(0)
  5. print('Explicit context switch to foo again')
  6. def bar():
  7. print('Explicit context to bar')
  8. gevent.sleep(0)
  9. print('Implicit context switch back to bar')
  10. gevent.joinall([
  11. gevent.spawn(foo),
  12. gevent.spawn(bar),
  13. ])
  1. Running in foo
  2. Explicit context to bar
  3. Explicit context switch to foo again
  4. Implicit context switch back to bar

当我们将 gevent 用于网络和 IO 绑定函数时,它的真正威力就来了,这些函数可以协同调度。Gevent 负责处理所有细节,以确保网络库尽可能隐式地让步出它们的 greenlet 上下文。我再怎么强调这是一个多么有力的成语也不为过。但也许可以举个例子来说明。

在这种情况下,select() 函数通常是一个阻塞调用,它轮询各种文件描述符。

  1. import time
  2. import gevent
  3. from gevent import select
  4. start = time.time()
  5. tic = lambda: 'at %1.1f seconds' % (time.time() - start)
  6. def gr1():
  7. # Busy waits for a second, but we don't want to stick around...
  8. print('Started Polling: %s' % tic())
  9. select.select([], [], [], 2)
  10. # 可以理解成一个 IO 阻塞的操作,阻塞了2秒,这时 Greenlet 会自动切换到 gr2() 上下文执行
  11. print('Ended Polling: %s' % tic())
  12. def gr2():
  13. # Busy waits for a second, but we don't want to stick around...
  14. print('Started Polling: %s' % tic())
  15. select.select([], [], [], 2)
  16. print('Ended Polling: %s' % tic())
  17. def gr3():
  18. print("Hey lets do some stuff while the greenlets poll, %s" % tic())
  19. gevent.sleep(1)
  20. # 让当前 Greenlet 休眠1秒,上面 gr1() gr2() 阻塞操作完成后,再切换到 gr1() gr2() 的上下文执行
  21. gevent.joinall([
  22. gevent.spawn(gr1),
  23. gevent.spawn(gr2),
  24. gevent.spawn(gr3),
  25. ])
  1. Started Polling: at 0.0 seconds
  2. Started Polling: at 0.0 seconds
  3. Hey lets do some stuff while the greenlets poll, at 0.0 seconds
  4. Ended Polling: at 2.0 seconds
  5. Ended Polling: at 2.0 seconds

另一个比较综合的例子定义了一个不确定的任务函数 (它的输出不能保证对相同的输入给出相同的结果) 。在这种情况下,运行该函数的副作用是任务暂停执行的时间是随机的。

  1. import gevent
  2. import random
  3. def task(pid):
  4. """
  5. Some non-deterministic task
  6. """
  7. gevent.sleep(random.randint(0,2)*0.001)
  8. print('Task %s done' % pid)
  9. def synchronous():
  10. for i in range(1,10):
  11. task(i)
  12. def asynchronous():
  13. threads = [gevent.spawn(task, i) for i in xrange(10)]
  14. gevent.joinall(threads)
  15. print('Synchronous:')
  16. synchronous()
  17. print('Asynchronous:')
  18. asynchronous()
  1. Synchronous:
  2. Task 1 done
  3. Task 2 done
  4. Task 3 done
  5. Task 4 done
  6. Task 5 done
  7. Task 6 done
  8. Task 7 done
  9. Task 8 done
  10. Task 9 done
  11. Asynchronous:
  12. Task 1 done
  13. Task 5 done
  14. Task 6 done
  15. Task 2 done
  16. Task 4 done
  17. Task 7 done
  18. Task 8 done
  19. Task 9 done
  20. Task 0 done
  21. Task 3 done

在同步情况下,所有任务都是按顺序运行的,这会导致在执行每个任务时主程序阻塞(即暂停主程序的执行)。

程序的重要部分是 gevent.spawn,它将给定的函数封装在一个 Greenlet 线程中。初始化的 greenlet 列表存储在传递给 gevent 的数组线程中。gevent.joinall 函数,它会阻塞当前程序,来运行所有给定的 greenlet。只有当所有 greenlet 终止时,执行才会继续进行。

需要注意的是,异步情况下的执行顺序本质上是随机的,异步情况下的总执行时间比同步情况下少得多。实际上,同步的例子完成的最大时间是每个任务停顿0.002秒,导致整个队列停顿0.02秒。在异步情况下,最大运行时间大约为0.002秒,因为没有一个任务会阻塞其他任务的执行。

在更常见的用例中,异步地从服务器获取数据,fetch() 的运行时在请求之间会有所不同,这取决于请求时远程服务器上的负载。

  1. import gevent.monkey
  2. gevent.monkey.patch_socket()
  3. # 把内置的阻塞的 socket替换成非阻塞的socket
  4. import gevent
  5. import urllib2
  6. import simplejson as json
  7. def fetch(pid):
  8. response = urllib2.urlopen('http://json-time.appspot.com/time.json')
  9. result = response.read()
  10. json_result = json.loads(result)
  11. datetime = json_result['datetime']
  12. print('Process %s: %s' % (pid, datetime))
  13. return json_result['datetime']
  14. def synchronous():
  15. for i in range(1,10):
  16. fetch(i)
  17. def asynchronous():
  18. threads = []
  19. for i in range(1,10):
  20. threads.append(gevent.spawn(fetch, i))
  21. gevent.joinall(threads)
  22. print('Synchronous:')
  23. synchronous()
  24. print('Asynchronous:')
  25. asynchronous()

确定性

如前所述,greenlet 是确定的。给定相同的 greenlet 配置和相同的输入集,它们总是产生相同的输出。例如,让我们将一个任务分散到一个多进程(multiprocessing)池中,并将其结果与一个gevent池的结果进行比较。

  1. import time
  2. def echo(i):
  3. time.sleep(0.001)
  4. return i
  5. # Non Deterministic Process Pool
  6. from multiprocessing.pool import Pool
  7. p = Pool(10)
  8. run1 = [a for a in p.imap_unordered(echo, xrange(10))] # imap_unordered 返回一个顺序随机的 iterable 对象
  9. run2 = [a for a in p.imap_unordered(echo, xrange(10))]
  10. run3 = [a for a in p.imap_unordered(echo, xrange(10))]
  11. run4 = [a for a in p.imap_unordered(echo, xrange(10))]
  12. print(run1 == run2 == run3 == run4)
  13. # Deterministic Gevent Pool
  14. from gevent.pool import Pool
  15. p = Pool(10)
  16. run1 = [a for a in p.imap_unordered(echo, xrange(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  17. run2 = [a for a in p.imap_unordered(echo, xrange(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  18. run3 = [a for a in p.imap_unordered(echo, xrange(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  19. run4 = [a for a in p.imap_unordered(echo, xrange(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  20. print(run1 == run2 == run3 == run4)
  1. False
  2. True

尽管 gevent 通常是确定的,但当您开始与外部服务(如 socket 和文件)进行交互时,非确定性的来源可能会潜入您的程序。因此,即使 green 线程是确定性并发的一种形式,它们仍然会遇到POSIX线程和进程遇到的一些相同的问题。

与并发有关的长期问题称为竞争条件。简而言之,当两个并发线程/进程依赖于某些共享资源但还试图修改该值时,就会发生竞争状态。这将导致资源的值变得依赖于执行顺序。这是一个问题,一般来说,应该尽量避免竞态条件,因为它们会导致全局的不确定程序行为。

最好的方法是在任何时候都避免所有全局状态。全局状态和导入时间的副作用总是会回来咬你一口

生成 Greenlets

gevent提供了一些关于Greenlet初始化的包装器。一些最常见的模式是:

  1. import gevent
  2. from gevent import Greenlet
  3. def foo(message, n):
  4. """
  5. Each thread will be passed the message, and n arguments
  6. in its initialization.
  7. """
  8. gevent.sleep(n)
  9. print(message)
  10. # Initialize a new Greenlet instance running the named function
  11. # foo
  12. thread1 = Greenlet.spawn(foo, "Hello", 1)
  13. # Wrapper for creating and running a new Greenlet from the named
  14. # function foo, with the passed arguments
  15. thread2 = gevent.spawn(foo, "I live!", 2)
  16. # Lambda expressions
  17. thread3 = gevent.spawn(lambda x: (x+1), 2)
  18. threads = [thread1, thread2, thread3]
  19. # Block until all threads complete.
  20. gevent.joinall(threads)
  1. Hello
  2. I live!

除了使用基本的Greenlet类,您还可以子类化 Greenlet 类并覆盖 _run 方法。

  1. import gevent
  2. from gevent import Greenlet
  3. class MyGreenlet(Greenlet):
  4. def __init__(self, message, n):
  5. Greenlet.__init__(self)
  6. self.message = message
  7. self.n = n
  8. def _run(self):
  9. print(self.message)
  10. gevent.sleep(self.n)
  11. g = MyGreenlet("Hi there!", 3)
  12. g.start()
  13. g.join()
  1. Hi there!

Greenlets 状态

与代码的其他部分一样,greenlet可能以各种方式失败。greenlet可能无法抛出异常、无法停止或消耗太多系统资源。

greenlet 的内部状态通常是一个与时间相关的参数。在greenlets上有许多标志,它们允许您监视线程的状态:

started — 布尔值,指示Greenlet是否已启动

ready() — 布尔值,指示Greenlet是否已停止

successful() — 布尔值,指示Greenlet是否已停止且没有抛出异常

value — Greenlet返回的值

exception — 异常,在greenlet中抛出的未捕获异常实例

  1. import gevent
  2. def win():
  3. return 'You win!'
  4. def fail():
  5. raise Exception('You fail at failing.')
  6. winner = gevent.spawn(win)
  7. loser = gevent.spawn(fail)
  8. print(winner.started) # True
  9. print(loser.started) # True
  10. # Exceptions raised in the Greenlet, stay inside the Greenlet.
  11. try:
  12. gevent.joinall([winner, loser])
  13. except Exception as e:
  14. print('This will never be reached')
  15. print(winner.value) # 'You win!'
  16. print(loser.value) # None
  17. print(winner.ready()) # True
  18. print(loser.ready()) # True
  19. print(winner.successful()) # True
  20. print(loser.successful()) # False
  21. # The exception raised in fail, will not propagate outside the
  22. # greenlet. A stack trace will be printed to stdout but it
  23. # will not unwind the stack of the parent.
  24. print(loser.exception)
  25. # It is possible though to raise the exception again outside
  26. # raise loser.exception
  27. # or with
  28. # loser.get()
  1. True
  2. True
  3. You win!
  4. None
  5. True
  6. True
  7. True
  8. False
  9. You fail at failing.

程序关闭

当主程序接收到SIGQUIT时,无法生成(yield)的 greenlet 可能会使程序的执行时间比预期的更长。这将导致所谓的“僵尸进程”,需要从 Python 解释器外部杀死这些进程。

一种常见的模式是监听主程序上的SIGQUIT事件并在退出前调用 gevent.shutdown 。

  1. import gevent
  2. import signal
  3. def run_forever():
  4. gevent.sleep(1000)
  5. if __name__ == '__main__':
  6. gevent.signal(signal.SIGQUIT, gevent.kill)
  7. thread = gevent.spawn(run_forever)
  8. thread.join()

超时

超时是对代码块或Greenlet的运行时的约束。

  1. import gevent
  2. from gevent import Timeout
  3. seconds = 10
  4. timeout = Timeout(seconds)
  5. timeout.start()
  6. def wait():
  7. gevent.sleep(10)
  8. try:
  9. gevent.spawn(wait).join()
  10. except Timeout:
  11. print('Could not complete')

在with语句中,它们还可以与上下文管理器一起使用。

  1. import gevent
  2. from gevent import Timeout
  3. time_to_wait = 5 # seconds
  4. class TooLong(Exception):
  5. pass
  6. with Timeout(time_to_wait, TooLong):
  7. gevent.sleep(10)

此外,gevent 还为各种 Greenlet 和数据结构相关的调用提供超时参数。例如:

  1. import gevent
  2. from gevent import Timeout
  3. def wait():
  4. gevent.sleep(2)
  5. timer = Timeout(1).start()
  6. thread1 = gevent.spawn(wait)
  7. try:
  8. thread1.join(timeout=timer)
  9. except Timeout:
  10. print('Thread 1 timed out')
  11. # --
  12. timer = Timeout.start_new(1)
  13. thread2 = gevent.spawn(wait)
  14. try:
  15. thread2.get(timeout=timer)
  16. except Timeout:
  17. print('Thread 2 timed out')
  18. # --
  19. try:
  20. gevent.with_timeout(1, wait)
  21. except Timeout:
  22. print('Thread 3 timed out')
  1. Thread 1 timed out
  2. Thread 2 timed out
  3. Thread 3 timed out

猴子补丁

我们来到了Gevent的黑暗角落。到目前为止,我一直避免提到monkey patching,以尝试和激发强大的协同模式,但是现在是讨论monkey patching的黑魔法的时候了。 如果您注意到上面我们调用了命令 monkey.patch_socket(),这是一个纯粹用于修改标准库套接字库(socket)的副作用命令。

  1. import socket
  2. print(socket.socket)
  3. print("After monkey patch")
  4. from gevent import monkey
  5. monkey.patch_socket()
  6. print(socket.socket)
  7. import select
  8. print(select.select)
  9. monkey.patch_select()
  10. print("After monkey patch")
  11. print(select.select)
  1. class 'socket.socket'
  2. After monkey patch
  3. class 'gevent.socket.socket'
  4. built-in function select
  5. After monkey patch
  6. function select at 0x1924de8

Python 允许在运行时修改大多数对象,包括模块、类甚至函数。这通常是一个令人震惊的坏主意,因为它创建了一个“隐式副作用”,如果出现问题,通常非常难以调试,然而在极端情况下,库需要改变Python本身的基本行为,可以使用monkey补丁。在这种情况下,gevent能够修补标准库中的大多数阻塞系统调用,包括 socket、ssl、threading 和 select 模块中的调用。

例如,Redis-python 的绑定通常使用常规tcp socket与Redis-server实例通信。只需调用gevent.monkey.patch_all(),我们可以让redis绑定协同调度请求,并与gevent堆栈的其他部分一起工作。

这让我们可以在不编写任何代码的情况下集成通常无法与gevent一起工作的库。(尽管猴子补丁仍然是邪恶的,但在这种情况下,它是“有用的邪恶”。)

数据结构

事件

事件是greenlet之间异步通信的一种形式。

  1. import gevent
  2. from gevent.event import Event
  3. '''
  4. Illustrates the use of events
  5. '''
  6. evt = Event()
  7. def setter():
  8. '''After 3 seconds, wake all threads waiting on the value of evt'''
  9. print('A: Hey wait for me, I have to do something')
  10. gevent.sleep(3)
  11. print("Ok, I'm done")
  12. evt.set() # 运行到evt.set()会将flag设置为True,然后另外两个被阻塞的waitter的evt.wait()方法在看到flag已经为True之后不再继续阻塞运行并且结束。
  13. def waiter():
  14. '''After 3 seconds the get call will unblock'''
  15. print("I'll wait for you")
  16. evt.wait() # blocking
  17. print("It's about time")
  18. def main():
  19. gevent.joinall([
  20. gevent.spawn(setter),
  21. gevent.spawn(waiter),
  22. gevent.spawn(waiter),
  23. gevent.spawn(waiter),
  24. gevent.spawn(waiter),
  25. gevent.spawn(waiter)
  26. ])
  27. if __name__ == '__main__': main()

事件对象的扩展是 AsyncResult,它允许您在唤醒调用时发送一个值。这有时被称为future或deferred,因为它有对 future 值的引用,可以在任意的时间设置该值。

  1. import gevent
  2. from gevent.event import AsyncResult
  3. a = AsyncResult()
  4. def setter():
  5. """
  6. After 3 seconds set the result of a.
  7. """
  8. gevent.sleep(3)
  9. a.set('Hello!')
  10. def waiter():
  11. """
  12. After 3 seconds the get call will unblock after the setter
  13. puts a value into the AsyncResult.
  14. """
  15. print(a.get())
  16. gevent.joinall([
  17. gevent.spawn(setter),
  18. gevent.spawn(waiter),
  19. ])

队列

队列是按顺序排列的数据集,它们具有通常的 put / get 操作,但可以在Greenlets上安全操作的方式编写。

例如,如果一个Greenlet从队列中获取一个项目(item),则同一项目(item)不会被同时执行的另一个Greenlet获取。

  1. import gevent
  2. from gevent.queue import Queue
  3. tasks = Queue()
  4. def worker(n):
  5. while not tasks.empty():
  6. task = tasks.get()
  7. print('Worker %s got task %s' % (n, task))
  8. gevent.sleep(0)
  9. print('Quitting time!')
  10. def boss():
  11. for i in xrange(1,25):
  12. tasks.put_nowait(i)
  13. gevent.spawn(boss).join()
  14. gevent.joinall([
  15. gevent.spawn(worker, 'steve'),
  16. gevent.spawn(worker, 'john'),
  17. gevent.spawn(worker, 'nancy'),
  18. ])
  1. Worker steve got task 1
  2. Worker john got task 2
  3. Worker nancy got task 3
  4. Worker steve got task 4
  5. Worker john got task 5
  6. Worker nancy got task 6
  7. Worker steve got task 7
  8. Worker john got task 8
  9. Worker nancy got task 9
  10. Worker steve got task 10
  11. Worker john got task 11
  12. Worker nancy got task 12
  13. Worker steve got task 13
  14. Worker john got task 14
  15. Worker nancy got task 15
  16. Worker steve got task 16
  17. Worker john got task 17
  18. Worker nancy got task 18
  19. Worker steve got task 19
  20. Worker john got task 20
  21. Worker nancy got task 21
  22. Worker steve got task 22
  23. Worker john got task 23
  24. Worker nancy got task 24
  25. Quitting time!
  26. Quitting time!
  27. Quitting time!

根据需要,队列还可以在put或get上阻塞。

每个put和get操作都有一个非阻塞的对应操作,put_nowait 和 get_nowait 不会阻塞。如果操作是不可能的,会引发 gevent.queue.Empty 或 gevent.queue.Full

在这个例子中,我们让boss同时和workers运行,并且对队列有一个限制,防止它包含三个以上的元素。这个限制意味着put操作将阻塞,直到队列上有空间为止。相反,如果队列上没有要获取的元素,get操作就会阻塞,它还会接受一个超时参数,如果在超时的时间范围内找不到工作(work),则允许队列以异常 gevent.queue.Empty 退出。

  1. import gevent
  2. from gevent.queue import Queue, Empty
  3. tasks = Queue(maxsize=3)
  4. def worker(name):
  5. try:
  6. while True:
  7. task = tasks.get(timeout=1) # decrements queue size by 1
  8. print('Worker %s got task %s' % (name, task))
  9. gevent.sleep(0)
  10. except Empty:
  11. print('Quitting time!')
  12. def boss():
  13. """
  14. Boss will wait to hand out work until a individual worker is
  15. free since the maxsize of the task queue is 3.
  16. """
  17. for i in xrange(1,10):
  18. tasks.put(i) # 输入1,2,3,到4时,队列达到最大值,put方法阻塞,gevent 切换到下一个协程worker(steve)
  19. print('Assigned all work in iteration 1')
  20. for i in xrange(10,20):
  21. tasks.put(i)
  22. print('Assigned all work in iteration 2')
  23. gevent.joinall([
  24. gevent.spawn(boss),
  25. gevent.spawn(worker, 'steve'),
  26. gevent.spawn(worker, 'john'),
  27. gevent.spawn(worker, 'bob'),
  28. ])
  1. Worker steve got task 1
  2. Worker john got task 2
  3. Worker bob got task 3
  4. Worker steve got task 4
  5. Worker john got task 5
  6. Worker bob got task 6
  7. Assigned all work in iteration 1
  8. Worker steve got task 7
  9. Worker john got task 8
  10. Worker bob got task 9
  11. Worker steve got task 10
  12. Worker john got task 11
  13. Worker bob got task 12
  14. Worker steve got task 13
  15. Worker john got task 14
  16. Worker bob got task 15
  17. Worker steve got task 16
  18. Worker john got task 17
  19. Worker bob got task 18
  20. Assigned all work in iteration 2
  21. Worker steve got task 19
  22. Quitting time!
  23. Quitting time!
  24. Quitting time!

分组和池

组是运行中的greenlet的集合,这些greenlet作为组一起管理和调度。它还兼做并行调度程序,借鉴 Python multiprocessing 库。

  1. import gevent
  2. from gevent.pool import Group
  3. def talk(msg):
  4. for i in xrange(3):
  5. print(msg)
  6. g1 = gevent.spawn(talk, 'bar')
  7. g2 = gevent.spawn(talk, 'foo')
  8. g3 = gevent.spawn(talk, 'fizz')
  9. group = Group()
  10. group.add(g1)
  11. group.join() # 修改了官方的例子,这里join只会让当前线程等待g1,但g2和g3已经被启动,会被继续安排执行
  1. bar
  2. bar
  3. bar
  4. foo
  5. foo
  6. foo
  7. fizz
  8. fizz
  9. fizz

这对于管理异步任务组非常有用。

如上所述,Group还提供了一个API,用于将作业分派给分组的greenlet并以各种方式收集它们的结果。

  1. import gevent
  2. from gevent import getcurrent
  3. from gevent.pool import Group
  4. group = Group()
  5. def hello_from(n):
  6. print('Size of group %s' % len(group))
  7. print('Hello from Greenlet %s' % id(getcurrent()))
  8. group.map(hello_from, xrange(3))
  9. def intensive(n):
  10. gevent.sleep(3 - n)
  11. return 'task', n
  12. print('Ordered')
  13. ogroup = Group()
  14. for i in ogroup.imap(intensive, xrange(3)):
  15. print(i)
  16. print('Unordered')
  17. igroup = Group()
  18. for i in igroup.imap_unordered(intensive, xrange(3)):
  19. print(i)
  1. Size of group 3
  2. Hello from Greenlet 4340152592
  3. Size of group 3
  4. Hello from Greenlet 4340928912
  5. Size of group 3
  6. Hello from Greenlet 4340928592
  7. Ordered
  8. ('task', 0)
  9. ('task', 1)
  10. ('task', 2)
  11. Unordered
  12. ('task', 2)
  13. ('task', 1)
  14. ('task', 0)

池是一种结构,用于处理需要限制并发的动态数量的greenlets。在需要并行执行许多网络或IO绑定任务的情况下,这通常是可取的。

  1. import gevent
  2. from gevent.pool import Pool
  3. pool = Pool(2)
  4. def hello_from(n):
  5. print('Size of pool %s' % len(pool))
  6. pool.map(hello_from, xrange(3))
  1. Size of pool 2
  2. Size of pool 2
  3. Size of pool 1

通常在构建gevent驱动的服务时,会将整个服务围绕一个池结构进行中心处理。一个例子可能是在各种套接字(socket)上轮询的类。

  1. from gevent.pool import Pool
  2. class SocketPool(object):
  3. def __init__(self):
  4. self.pool = Pool(1000)
  5. self.pool.start()
  6. def listen(self, socket):
  7. while True:
  8. socket.recv()
  9. def add_handler(self, socket):
  10. if self.pool.full():
  11. raise Exception("At maximum pool size")
  12. else:
  13. self.pool.spawn(self.listen, socket)
  14. def shutdown(self):
  15. self.pool.kill()

锁和信号量

信号量是一种低级同步原语,它允许greenlet协调和限制并发访问或执行。信号量公开两种方法,获取和释放信号量被获取和释放的次数之差称为信号量的界限。如果信号量范围达到0,它就会阻塞,直到另一个greenlet释放它的捕获。

  1. from gevent import sleep
  2. from gevent.pool import Pool
  3. from gevent.coros import BoundedSemaphore
  4. sem = BoundedSemaphore(2)
  5. def worker1(n):
  6. sem.acquire()
  7. print('Worker %i acquired semaphore' % n)
  8. sleep(0)
  9. sem.release()
  10. print('Worker %i released semaphore' % n)
  11. def worker2(n):
  12. with sem:
  13. print('Worker %i acquired semaphore' % n)
  14. sleep(0)
  15. print('Worker %i released semaphore' % n)
  16. pool = Pool()
  17. pool.map(worker1, xrange(0,2))
  18. pool.map(worker2, xrange(3,6))
  1. Worker 0 acquired semaphore
  2. Worker 1 acquired semaphore
  3. Worker 0 released semaphore
  4. Worker 1 released semaphore
  5. Worker 3 acquired semaphore
  6. Worker 4 acquired semaphore
  7. Worker 3 released semaphore
  8. Worker 4 released semaphore
  9. Worker 5 acquired semaphore
  10. Worker 5 released semaphore

界限为1的信号量称为锁。它提供对一个greenlet的独占执行。它们通常用于确保资源只在程序上下文中使用一次。

翻译持续更新中 …