python中的GIL

  • GIL(Global Interpreter Lock),就是一个锁。
  • Python中的一个线程对应于 C语言 中的一个线程。
  • GIL使得同一时刻只有一个线程在一个cpu上执行字节码,无法将多个线程分配到多个cpu上进行同步运行。如果在多核cpu上,线程是并发运行,而不是并行。

HTB1aPOea._rK1Rjy0Fcq6zEvVXai.jpg

首先,这样效率不高,但是看似也不会产生数据访问冲突的问题,毕竟同一时刻只有一个线程在一个核上运行嘛,然而,看下面这个例子:

  1. sum = 0
  2. def add():
  3. global sum
  4. for i in range(1000000):
  5. sum += 1
  6. def subtract():
  7. global sum
  8. for i in range(1000000):
  9. sum -= 1
  10. import threading
  11. add_thread = threading.Thread(target=add)
  12. sub_thread = threading.Thread(target=subtract)
  13. add_thread.start()
  14. sub_thread.start()
  15. add_thread.join()
  16. sub_thread.join()
  17. print(sum)

如果按照上面的理解,线程间很安全,最后结果应该会是 0,运行三次代码的结果如下:

  1. # result:
  2. # 358918
  3. # 718494
  4. # -162684

这说明两个线程并没有顺序异步执行。在一些特定的情况,GIL这把锁会被打开,一定程度上达到并行的效果。

GIL会根据线程执行的字节码行数以及时间片以及遇到 I/O 操作打开,所以Python的多线程对 I/O 密集型代码比较友好,比如,文件处理和网络爬虫。

多线程编程

线程模块

Python3中提供了两个模块来使用线程_threadthreading,前者提供了低级别、原始的线程以及一个简单的锁,相比后者功能还是比较有限的,所以我们使用threading模块。

使用案例

直接使用Thread来实例化

  1. import time
  2. import threading
  3. def learn(obj):
  4. print("learning {sth} started".format(sth=obj))
  5. time.sleep(2)
  6. print("learning {sth} end".format(sth=obj))
  7. def play(obj):
  8. print("playing {sth} started".format(sth=obj))
  9. time.sleep(4)
  10. print("playing {sth} end".format(sth=obj))
  11. # 创建出两个线程对象
  12. learn_thread = threading.Thread(target=learn, args=("Python",))
  13. play_thread = threading.Thread(target=play, args=("Game",))
  14. start_time = time.time()
  15. # 启动线程,开始执行
  16. learn_thread.start()
  17. play_thread.start()
  18. end_time = time.time()
  19. span = end_time - start_time
  20. print("[lasting for {time_span}s]".format(time_span=span))
  21. # result:
  22. # learning Python started
  23. # playing Game started
  24. # [lasting for 0.0005965232849121094s]
  25. # learning Python end
  26. # playing Game end

可以看到,整个程序的运行时间基本上是 0s,这是因为整个程序中实际有三个线程:创建出来的两个线程和主线程(MainThread),那两个线程创建出来后就不受主线程控制了,他们的工作不占用主线程的时间,主线程除了计时就没有其他逻辑了,因此主线程持续时间是几乎是 0s。

但是主线程逻辑完成后并没有退出,它等待了另外两个线程运行的结束,如果主线程在其他两个线程结束之前就退出了,意味着整个程序进程终止了,另外两个线程会迅速终止。如果我们就是有这种需求,那就可以将另外两个线程配置成守护线程,主线程结束,他们也立刻结束。

  1. learn_thread.setDeamon(True)
  2. play_thread.setDeamon(True)
  3. # result:
  4. # learning Python started
  5. # playing Game started
  6. # [lasting for 0.015601158142089844s]

主线程逻辑执行完后就退出了,其他两个线程还没来得及打印消息也被一并终止了。

如果主线程需要等到两个线程执行完后再打印整个运行时间,就可以这么设置:

  1. learn_thread.start()
  2. play_thread.start()
  3. # 主线程会在这里阻塞,创建出来的线程执行完后才继续往下执行
  4. learn_thread.join()
  5. play_thread.join()
  6. end_time = time.time()
  7. span = end_time - start_time
  8. print("[lasting for {time_span}s]".format(time_span=span))
  9. # result:
  10. # learning Python started
  11. # playing Game started
  12. # learning Python end
  13. # playing Game end
  14. # [lasting for 4.003938436508179s]

现在总的运行时间是 4s,意味着主线程等待了两个子线程的运行,而且两个子线程是同步执行的,2s + 4s = max(2s, 4s)

通过继承Thread实现多线程

  1. import threading
  2. class LearnThread(threading.Thread):
  3. def __init__(self, obj):
  4. self.sth = obj # 处理一下参数问题
  5. super().__init__() # 委托给父类完成创建
  6. # 这个函数在 start() 之后会自动调用,里面写主要的业务逻辑
  7. def run(self):
  8. print("learning {sth} started".format(sth=self.sth))
  9. time.sleep(2)
  10. print("learning {sth} end".format(sth=self.sth))

用同样的逻辑实现PlayThread子线程类,最后结果如下:

  1. ...
  2. # 创建两个线程实例
  3. learn_thread = LearnThread("Python")
  4. play_thread = PlayThread("BasketBall")
  5. start_time = time.time()
  6. # 启动线程,开始执行
  7. learn_thread.start()
  8. play_thread.start()
  9. learn_thread.join()
  10. play_thread.join()
  11. end_time = time.time()
  12. span = end_time - start_time
  13. print("[lasting for {time_span}s]".format(time_span=span))
  14. # result:
  15. # learning Python started
  16. # playing BasketBall started
  17. # learning Python end
  18. # playing BasketBall end
  19. # [lasting for 4.0020973682403564s]

这种方式更加灵活,在线程内可以自定义我们的逻辑,如果线程非常复杂,这样写可以使程序更加模块化,也更容易后续维护。

线程间的通信

引言

如果程序中有多个线程,他们的推进顺序可能相互依赖,a线程执行到某一阶段后,b线程才能开始执行,b线程执行完毕后,a线程才能继续进行。
这样一种情况之下,线程之间就要进行通信,才能保证程序的正常运行。

通过共享变量来实现

线程安全不能保证,不推荐,就不详细讲解了。

通过Queue来进行线程通信

思路和共享变量差不多,只不过这里使用的数据结构是经过封装的,是线程安全的,使用起来也更加方便。

  1. import time
  2. import threading
  3. from queue import Queue
  4. # 用来向 queue 中加入数据
  5. def append(q):
  6. for i in range(4):
  7. print("[append_thread] putting data {data} to q...".format(data=i))
  8. q.put(i)
  9. time.sleep(1)
  10. q.put(None) # end flag
  11. # 用来向 queue 中取出数据
  12. def pop(q):
  13. while True:
  14. data = q.get()
  15. if data is None:
  16. print("[pop_thread] all clear in the queue")
  17. q.task_done()
  18. break
  19. else:
  20. print("[pop_thread] get data {d}".format(d=data))
  21. time.sleep(3)
  22. q.task_done()
  23. q = Queue(2)
  24. append_thread = threading.Thread(target=append, args=(q,))
  25. pop_thread = threading.Thread(target=pop, args=(q,))
  26. append_thread.start()
  27. pop_thread.start()
  28. q.join() # 利用 queue 来阻塞主线程
  29. print("\n===Done===")
  30. # result:
  31. # [append_thread] putting data 0 to q...
  32. # [pop_thread] get data 0
  33. # [append_thread] putting data 1 to q...
  34. # [append_thread] putting data 2 to q...
  35. # [pop_thread] get data 1
  36. # [append_thread] putting data 3 to q...
  37. # [pop_thread] get data 2
  38. # [pop_thread] get data 3
  39. # [pop_thread] all clear in the queue
  40. # ===Done===
  1. q = Queue(2)

这里创建了一个Queue类型对象,接收一个整数参数,告知队列的容量,如果传入一个非正数,容量默认是正无穷(当然,这取决于你电脑的配置情况)。

  1. q.get(block=True, timeout=None)
  2. q.put(item, block=True, timeout=None)

这两个方法向队列中添加元素,或取出元素。默认情况下,如果队列满了,调用q.put()会进行阻塞,直到队列中有空位才放入元素,完成整个函数调用。可以设置block=False将它转为非阻塞调用,如果队列满了,则直接引发一个Full exception,通过timeout来设置一定的等待时间,如果在阻塞等待时间内任然没空位放入元素,再抛出异常。q.get()方法逻辑类似,抛出异常Empty exception

  1. q.task_done()
  2. q.join()

q.join()通过队列来阻塞主线程,队列内部有一个计数器,每放入一个元素,计数器加一,当计数器重新归零后,解除阻塞。q.task_done()就是将计数器减一的,一般和q.get()配合使用,如果使用过量,导致计数器 小于0 ,则引发ValueError Exception

  1. q.qsize() # 获得队列中的元素个数
  2. q.empty() # 队列是否为空
  3. q.full() # 队列是否满

线程同步

引言

再回到最开始的GIL案例,两个线程,其中一个对全局变量进行一百万次加1运算,另外一个进行一百万次减1运算。最后全局变量的值是几乎随机的,与我们预想的 0
并不相同。因为两个线程是异步修改这个变量,不能保证某一时刻的取值就是正确的。

因此,要对线程进行同步控制,当一个线程操作时,另一个等待,然后交换执行。

使用Lock

  1. import threading
  2. sum = 0
  3. lock = threading.Lock()
  4. def add():
  5. global sum
  6. for i in range(1000000):
  7. lock.acquire() # lock here
  8. sum += 1
  9. lock.release() # unlock here
  10. def subtract():
  11. global sum
  12. for i in range(1000000):
  13. lock.acquire() # lock here
  14. sum -= 1
  15. lock.release() # unlock here
  16. add_thread = threading.Thread(target=add)
  17. sub_thread = threading.Thread(target=subtract)
  18. add_thread.start()
  19. sub_thread.start()
  20. add_thread.join()
  21. sub_thread.join()
  22. print(sum)
  23. # result:
  24. # 0

通过Lock,我们可以在执行相关代码之前申请,将一段代码逻辑锁起来,锁资源全局只有一个,一个线程申请了另外一个就不能够申请,它要等到资源释放后才能申请。因此,就保证了同一时刻只有一个线程拿到锁,只有一个线程能够进行变量的修改。

这种方式比较影响性能,获取锁释放锁都需要时间,也可能引起死锁问题,连续两次执行lock.acquire()就可以引发死锁,死锁可以通过另外一个线程来解开。这也是后面使用Condition的一个核心理念。

  1. import threading
  2. lock = threading.Lock()
  3. def dead_lock(lock):
  4. lock.acquire()
  5. lock.acquire() # 直接调用两次会死锁这个线程
  6. print("unlock")
  7. def un_lock(lock):
  8. lock.release() # 用这个线程来开锁
  9. dead_lock_thread = threading.Thread(target=dead_lock, args=(lock,))
  10. un_lock_thread = threading.Thread(target=un_lock, args=(lock,))
  11. dead_lock_thread.start() # 注意调用顺序
  12. un_lock_thread.start() # 通过它,死锁的线程会被打开,继续执行打印结果
  13. # result:
  14. # unlock

使用RLock

在同一个线程中,可以连续多次调用lock.acquire(),注意最后获取锁和释放锁的次数要相同。

  1. lock = threading.RLock()
  2. def add():
  3. global sum
  4. for i in range(1000000):
  5. lock.acquire() # lock here
  6. do_sth(lock) # 在对变量 +1 之前,要对变量做其他操作,在函数中可以再次加锁
  7. sum += 1
  8. lock.release() # unlock here

使用Condition

底层使用 RLock 实现的,实现了上下文管理器协议,可以用with语句进行操作,不用担心acquire()release()的问题。

  1. import threading
  2. msg = []
  3. conn = threading.Condition()
  4. def repeater_one(conn):
  5. with conn:
  6. global msg
  7. for i in range(3):
  8. data = "小伙子,没想到你也是复读机 ({idx})".format(idx=i)
  9. msg.append(data)
  10. print("[one]:", data)
  11. conn.notify()
  12. conn.wait()
  13. def repeater_two(conn):
  14. with conn:
  15. global msg
  16. for i in range(3):
  17. conn.wait()
  18. data = msg.pop()
  19. print("[two]:", data)
  20. conn.notify()
  21. repeater_one_thread = threading.Thread(target=repeater_one, args=(conn,))
  22. repeater_two_thread = threading.Thread(target=repeater_two, args=(conn,))
  23. # 注意启动的顺序非常重要
  24. repeater_two_thread.start()
  25. repeater_one_thread.start()
  26. # result:
  27. # [one]: 小伙子,没想到你也是复读机 (0)
  28. # [two]: 小伙子,没想到你也是复读机 (0)
  29. # [one]: 小伙子,没想到你也是复读机 (1)
  30. # [two]: 小伙子,没想到你也是复读机 (1)
  31. # [one]: 小伙子,没想到你也是复读机 (2)
  32. # [two]: 小伙子,没想到你也是复读机 (2)

这里实现了两个复读机线程,一个线程打印完数据后,另外一个线程进行复述,数据保留在全局变量msg中,通过Condition来协调两个线程的访问顺序,实现复读效果。

这种控制方式的思想就是当满足了某些条件,线程才能继续运行下去,否则,线程会一直阻塞,直到条件被满足。

  1. conn.notify()
  2. conn.wait()

当条件满足是,使用notify()用来唤醒等待的线程,要等待条件时,再使用wait()进行阻塞。repeater one首先说一句话,然后唤醒repeater two,自身进入等待状态;repeater two等到有人说话后,进行复读,然后唤醒repeater one再次说话,自身进入等待状态。要保证的是,每一时刻,只能有一个线程处于等待状态,否则两个线程都会被阻塞。因此,线程启动的顺序和阻塞唤醒条件非常重要。

  1. with conn:
  2. # todo
  3. # 或者
  4. conn.require()
  5. # todo
  6. conn.release()

这里直接使用with语句,省略了一些逻辑,也可以使用完整的写法,但是要注意操作的匹配。

Condition的底层其实使用了两层锁,当我们在一个线程中调用require()的时候,内部维护的一个锁(Rlock)会自动锁上,另外一个线程在调用require()时,就会被阻塞。

多线程 - 图2

多线程 - 图3

下面是wait()的主要逻辑,调用wait()会将锁打开self._release_save(),这就允许了另外一个线程调用require(),同时建立第二层锁,waiter,将它加入到队列(底层是 deque)中,每调用一次就会产生一把锁,同时调用waiter.acquire(),接着在后面,会用各种逻辑判断再次调用waiter.acquire(),前面讲过,连续两次调用会造成这个线程的阻塞。 那这个锁在哪里打开呢?在另外一个线程的notify()方法中,这个锁打开了,它就可以继续往下运行了。
多线程 - 图4
多线程 - 图5

使用Semaphore

用来控制资源使用数量的锁,对于文件来说,读操作可以有多个线程同时进行,共享文件资源,而写操作,就只能有一个线程来独占资源,一般用来控制线程的并发数量。

现在我们来控制读线程的并发数量,每一时刻只有 3 个线程在工作,而且工作时间是 2s,总共有10个读线程要完成操作。

  1. import threading
  2. import time
  3. sem = threading.Semaphore(3) # 指明信号的数量
  4. def read(sem):
  5. sem.acquire() # 拿到信号
  6. print("doing reading staff...")
  7. time.sleep(2)
  8. sem.release() # 释放信号
  9. for i in range(7):
  10. read_thread = threading.Thread(target=read, args=(sem,))
  11. read_thread.start()
  12. """
  13. 最后的结果是每两秒就有三个线程开始读操作
  14. """
  15. # result:
  16. # doing reading staff...
  17. # doing reading staff...
  18. # doing reading staff...
  19. # doing reading staff...
  20. # doing reading staff...
  21. # doing reading staff...
  22. # doing reading staff...

Semaphore的底层是使用Condition进行实现的,内部维护了一个_value变量,用来计数。

多线程 - 图6

acquire()release()内部的逻辑有些改变,在申请资源时,首先要看_value的值有没有减到 0 ,如果有,再有线程执行acquire()就会执行wait()进行阻塞,资源释放时要增加_value的值,同时使用notify()唤醒队列中等待的一个线程。

多线程 - 图7

多线程 - 图8

concurrent线程池

引言

为什么要线程池呢?回看上面的Semaphore例子,我们定义了一个数量为 3 的信号量,保证了同一时刻只有 3 个线程存在于内存中。但是从程序开始运行到结束,我们一共使用过 7 个线程range(7),完成了 7 次同样的读操作,也就是说创建了 7 次线程,又销毁了 7 次线程。如果每个线程的执行时间非常短,又需要创建大量的线程,那么资源都在创建/销毁线程的过程中被消耗了。

能不能总共就使用 3 个线程达到同样的效果呢?每个线程多做几次同样的操作逻辑就可以了,concurrent.futures就提供了这样的管理方案,同时,还有下面这些优点:

  • 主线程中可以获取某一个线程状态或者一个任务的状态,还有返回值。
  • 当一个线程完成工作后,主线程能立刻知道。
  • futures使得多线程多进程编码接口一致。

使用案例

  1. from concurrent.futures import ThreadPoolExecutor
  2. import time
  3. def read(sth):
  4. print("Reading {sth}...".format(sth=sth))
  5. time.sleep(1)
  6. return "{sth} done".format(sth=sth)
  7. executor = ThreadPoolExecutor(2) # 创建一个可以容纳 两个 线程的线程池
  8. if __name__ == "__main__":
  9. task_one = executor.submit(read, "books")
  10. task_two = executor.submit(read, "newspaper")
  11. task_three = executor.submit(read, "comics")
  12. print(task_one.done())
  13. time.sleep(4)
  14. print(task_two.done())
  15. print(task_three.done())
  16. # result:
  17. # Reading books...
  18. # Reading newspaper...
  19. # False
  20. # Reading comics...
  21. # True
  22. # True
  1. executor = ThreadPoolExecutor(2)

这里创建了一个容纳两个线程的线程池,如果不指定线程数量参数,它会以 5倍 cpu内核数量作为默认值。Python33.5, 3.6, 3.7版本的更新中都加入了可选参数,可以查看官方文档熟悉新的使用方式。

  1. submit(fn, *args, **kwargs)
  2. task_one = executor.submit(read, "books") # reture Future object
  3. task_two = executor.submit(read, "newspaper")
  4. task_three = executor.submit(read, "comics")

这里使用submit()方法向线程池中提交任务,提交的任务数量可以大于线程池中申请的线程数量。第一个参数是任务函数,后面依次列出参数。一旦任务被提交,线程池中的线程自动进行调度,直到所有提交任务的完成。提交任务后,会返回一个Future对象。

  1. """
  2. Future 类型对象会在 submit() 函数调用之后返回
  3. """
  4. future.done() # 如果提交的任务完成,这个方法会返回 True
  5. future.cancel() # 取消任务执行,如果任务已经调度执行,就不能取消
  6. future.result() # 返回任务函数执行后的返回结果

main中加入新的逻辑:

  1. ...
  2. print(task_one.result()) # 这些都是阻塞式调用,获得结果后才会继续向下执行
  3. print(task_two.result())
  4. print(task_three.result())
  5. # result:
  6. # books done
  7. # newspaper done
  8. # comics done

上面的写法其实比较麻烦,如果向线程池中提交的任务过多,这样操作每个Future对象会相当繁琐。可以批量进行任务的提交,将Future对象加入一个列表进行管理,配合使用模块中的as_complete函数,可以一次性获得所有执行完成任务函数的Future对象。

  1. from concurrent.futures import as_completed
  2. ...
  3. # ---修改main中的逻辑
  4. items = ["books", "newspaper", "comics"]
  5. task_list = [executor.submit(read, item) for item in items]
  6. for future in as_completed(task_list):
  7. print(future.result())
  8. # result:
  9. # newspaper done
  10. # books done
  11. # comics done

核心逻辑中,as_complete()将已经执行完的任务函数对应的Future对象通过yield进行返回,这里完成的顺序和任务提交的顺序并不一样,和内部的调度逻辑有关,我多次执行结果没有完全一样。这里的yield逻辑是在一定的条件下才会发生的,因此,只要有线程没有运行完,就无法yield结果,会在for这里进行阻塞,等到所有任务执行完毕之后,for结束。

多线程 - 图9

还有一种更加简洁的办法,在executor中,有一个和Python内置函数map()逻辑相似的函数。它将任务函数和参数进行一一匹配调用,直接返回future.result()对象。这种方式是顺序进行调度的,完成顺序总是:books,newspaper, comics

  1. ...
  2. items = ["books", "newspaper", "comics"]
  3. for res in executor.map(read, items):
  4. print(res)

补充

wait()函数

用来让主线程在不同条件下等待线程池中线程的运行。

  1. def wait(fs, timeout=None, return_when=ALL_COMPLETED):
  2. """
  3. 1. fs: Futures 对象的序列,当他们对应的任务函数都完成后,解除阻塞
  4. 2. timeout: 等待的时间,超过时间就不等了
  5. 3. return_when
  6. FIRST_COMPLETED: 任何一个任务函数完成
  7. FIRST_EXCEPTION: 任何一个任务函数执行时抛异常
  8. ALL_COMPLETED: 所有都完成
  9. 3个参数条件,哪个先满足就直接解除阻塞
  1. from concurrent.futures import ThreadPoolExecutor, wait
  2. import time
  3. def read(times):
  4. time.sleep(times)
  5. print("read for {span}s".format(span=times))
  6. executor = ThreadPoolExecutor(2)
  7. time_list = [1, 2, 3, 4]
  8. task_list = [executor.submit(read, times) for times in time_list]
  9. print("done")
  10. # result:
  11. # done
  12. # read for 1s
  13. # read for 2s
  14. # read for 3s
  15. # read for 4s

这里修改了read()函数的逻辑,由读不同的内容改为读不同的时间长度。这里主线程没有等待线程池中的任务,提交任务后直接执行了print()

  1. ...
  2. wait(task_list)
  3. print("done")
  4. # result:
  5. # read for 1s
  6. # read for 2s
  7. # read for 3s
  8. # read for 4s
  9. # done
  1. ...
  2. wait(task_list, 2) # 就等两秒钟
  3. print("done")
  4. # read for 1s
  5. # read for 2s
  6. # done
  7. # read for 3s
  8. # read for 4s
  1. from concurrent.futures import FIRST_COMPLETED
  2. # 注意常量值的使用,选一种合适的方法进行导入使用
  3. # from concurrent import futures
  4. # futures.FIRST_COMPLETED
  5. ...
  6. wait(task_list, return_when=FIRST_COMPLETED)
  7. print("done")
  8. # result:
  9. # read for 1s
  10. # done
  11. # read for 2s
  12. # read for 3s
  13. # read for 4s

Future对象

未来对象?随便叫啥吧。前面也看到过,通过submit()提交任务函数后,就会返回这么一个对象,可以通过它来监控运行我们任务函数的那个线程:判断是否运行完成、得到返回值等。

多线程 - 图10

从源码中,可以看出,任务提交之后,先创建了一个Future类型对象f,然后将这个对象连同我们提交的任务参数一起委托给了_WorkItem(),之后也返回一个对象w,将它加入队列self._work_queue,这应该就是内部调度的逻辑了,最后返回f

多线程 - 图11

_adjust_thread_count()用来创建出我们需要的线程数量,并且target=worker,参数列表中有self._work_queue作为参数。_threads是用来存放线程的一个集合。我们提交的不同任务函数怎么变成了一个_worker函数呢?

多线程 - 图12

_worker的主要逻辑中,有一个循环,从self._work_queue不断取出一个任务,它是_WorkItem类型的,在submit()函数中加入,然后调用它的run()方法,随后del work_item将它删除。多个任务线程读取的是同一个任务队列,直到任务全部完成。

多线程 - 图13

_WorkItem中,它的run()方法调用了我们提交的任务函数fn,并且记录了它的返回值,进行了一些异常处理。

多线程 - 图14

shutdown()

前面都在讲线程池的使用方法和工作原理,还有一个细节需要补充的就是executorshutdown()方法。

多线程 - 图15

它是用来关闭线程池,如果在关闭之后继续向里面提交任务,会抛出一个异常。调用之后,self._work_queue.put(None)往任务队列中加入了一个标记,当线程调度时拿到这个标志就知道任务结束了,这与在前面使用Queue进行线程间通信的案例用了同样的方式。

有一个可选参数wait,如果它是True,则主线程在这里阻塞,等待所有线程完成任务。如果是False,主线程继续向下执行。

可以使用with语句,当所有的任务完成之后自动调用shutdown(),这里就不再多举例子,官方文档 给出了一个经典的例子,以作参考。