昨日内容回顾

线程

什么是线程?

线程是 cpu 调度的最小单位

进程是资源分配的最小单位


进程和线程是什么关系?

线程是在进程中的 一个执行单位

多进程 本质上开启的这个进程里就有一个线程

多线程 单纯的在当前进程中开启了多个线程


线程和进程的区别:

线程的开启 销毁 任务切换的时间开销小

在同一个进程中数据共享

能实现并发,但不能脱离进程

进程负责管理分配资源 线程负责执行代码


GIL 锁 —— 全局解释器锁

同一时刻只能有一个线程访问 CPU —— 线程锁

Cpython 会受到 GIL 影响

而 pypy 和 jpython 不会受到 GIL 影响


python 程序效率下降的问题

高计算型 —— 多线程会导致程序的效率下降

高 IO 型的 —— 可以使用多线程,不会受到影响

多进程

分布式计算 —— celery(Python开发的分布式任务调度模块)

启动简单线程

  1. from threading import Thread
  2. def func():
  3. print(123)
  4. Thread(target=func).start()

执行输出:123

守护线程和守护进程的区别?

守护线程是等待主进程代码结束之后就结束

守护线程是等待主线程都结束之后才结束

主线程等待其他线程结束,才结束。

开启线程的第二种方法:是用类继承

  1. from threading import Thread
  2. import time
  3. class Sayhi(Thread):
  4. def __init__(self,name):
  5. super().__init__()
  6. self.name=name
  7. def run(self):
  8. time.sleep(2)
  9. print('%s say hello' % self.name)
  10. if __name__ == '__main__':
  11. t = Sayhi('egon')
  12. t.start()
  13. print('主线程')

执行输出:

主线程

egon say hello

一、Thread 类的其他方法

  1. Thread实例对象的方法
  2. # isAlive(): 返回线程是否活动的。
  3. # getName(): 返回线程名。
  4. # setName(): 设置线程名。
  5. threading模块提供的一些方法:
  6. # threading.currentThread(): 返回当前的线程变量。
  7. # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  8. # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

举例:

  1. from threading import Thread
  2. def func():
  3. print(123)
  4. t = Thread(target=func)
  5. t.start()
  6. print(t.is_alive()) # 返回线程是否是活动的

执行输出:

123

False

睡 0.1 秒

  1. import time
  2. from threading import Thread
  3. def func():
  4. time.sleep(0.1)
  5. print(123)
  6. t = Thread(target=func)
  7. t.start()
  8. print(t.is_alive())

执行输出:

True

123

  1. import time
  2. from threading import Thread
  3. def func():
  4. time.sleep(0.1)
  5. print(123)
  6. t = Thread(target=func)
  7. t.start()
  8. print(t.is_alive()) # 返回线程是否活动的
  9. print(t.getName()) #返回线程名
  10. t.setName('t1') # 设置线程名
  11. print(t.getName())

执行输出:

True

Thread-1

t1

123

  1. import time
  2. from threading import Thread,currentThread,enumerate,activeCount
  3. def func():
  4. time.sleep(0.1)
  5. #print(123)
  6. t = Thread(target=func)
  7. t.start()
  8. print(currentThread) # 返回当前的线程变量
  9. print(enumerate()) #返回一个包含正在运行的线程的list
  10. print(activeCount()) # 返回正在运行的线程数量

执行输出:

[<_MainThread(MainThread, started 20008)>, ]

2

MainThread, started 20008 表示主线程,Thread-1, started 20232 表示子线程。它会打印出 2 个。

所以 activeCount 的结果为 2

二、同步锁

当多线程争夺锁时,允许第一个获得锁的线程进入临街区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临街区,并释放锁。

多个线程抢占资源的情况:

  1. import time
  2. from threading import Thread
  3. def func():
  4. global n
  5. temp = n
  6. time.sleep(1)
  7. n = temp -1
  8. n = 100
  9. t_lst = []
  10. for i in range(100):
  11. t = Thread(target=func)
  12. t.start()
  13. t_lst.append(t)
  14. for t in t_lst:t.join()
  15. print(n)

执行输出:99

为啥呢?

明明减了 100 次,结果应该是 0 的。

为啥是 99 呢?难度是 GIL 的问题,但 GIL 是计算 CPU 那一刻的锁

下面开始具体分析:

第一步,每个线程执行 global n:temp =n 此时,temp 等于 100

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图1

第二步,当线程设计到 CPU 计算时,向 CPU 发送请求。但是收到 GIL 的限制

同一时刻,只能有一个线程计算。

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图2

CPU 计算结果后,返回给线程。线程赋值,并修改全局变量 n

此时 n=99,线程结束

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图3

那么其他线程,也是做同样的操作

每个线程赋值 n 等于 99。不管它已经是 99 了。

上面的现象,出现了数据不安全的情况

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图4

最后赋值了 100 次,都是 n=99。所以最终结果是 99

怎么解决呢?加锁

  1. from threading import Thread,Lock
  2. def func(lock):
  3. global n
  4. lock.acquire() # 加锁
  5. temp = n
  6. n = temp -1
  7. lock.release() # 解锁
  8. n = 100
  9. t_lst = []
  10. lock = Lock() # 创建锁
  11. for i in range(100):
  12. t = Thread(target=func,args=(lock,))
  13. t.start()
  14. t_lst.append(t)
  15. for t in t_lst:t.join() # 等待所有子线程结束
  16. print(n)

执行输出:0

如果把计算和赋值拆开,就会出现数据不安全的情况

下面写法,不用加锁,也可以得到 0

  1. from threading import Thread
  2. def func():
  3. global n
  4. n -= 1
  5. n = 100
  6. for i in range(100):
  7. t = Thread(target=func)
  8. t.start()<br>print(n)

执行输出:0

因为默认有一个 GIL 锁,所以每个线程都减等 1。所以最终结果为 0

三、死锁与递归锁

死锁

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

  1. from threading import Lock
  2. lock = Lock() # 在同一个线程中,能够被一个锁的多个acquire阻住,这种锁就叫做互斥锁
  3. lock.acquire()
  4. lock.acquire()
  5. lock.acquire()

死锁,也叫互斥锁

科学家吃面的问题

要完成一件事情 需要两个必要因素

要想吃到面,需要: 叉子,面

资源的互相抢占的问题 —— 死锁

四个人围着一张桌子,桌子上放着一碗面,碗里有一个叉子

必须拿到叉子,才能吃面。

每个人每次只能吃一口面,吃完就放回去,让下一个人吃。

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图5

  1. import time
  2. from threading import Thread,Lock
  3. def eat1(noodle_lock,fork_lock,name): # 吃面
  4. noodle_lock.acquire() # 面条加锁
  5. print('%s 抢到了面条'%name)
  6. fork_lock.acquire() # 叉子加锁
  7. print('%s 抢到了叉子'%name)
  8. print('%s 正在吃面'%name)
  9. fork_lock.release() # 叉子解锁
  10. print('%s归还了叉子'%name)
  11. noodle_lock.release() # 面条解锁
  12. print('%s归还了面条' % name)
  13. def eat2(noodle_lock,fork_lock,name): #也是吃面,不同的是:先抢叉子,再抢面
  14. fork_lock.acquire()
  15. print('%s抢到了叉子' % name)
  16. time.sleep(0.5)
  17. noodle_lock.acquire()
  18. print('%s抢到了面'%name)
  19. print('%s正在吃面'%name)
  20. noodle_lock.release()
  21. print('%s归还了面' % name)
  22. fork_lock.release()
  23. print('%s归还了叉子' % name)
  24. noodle_lock = Lock() # 面条锁
  25. fork_lock = Lock() # 叉子锁
  26. t1 = Thread(target=eat1,args=(noodle_lock,fork_lock,'nazha')).start()
  27. t2 = Thread(target=eat2,args=(noodle_lock,fork_lock,'egon')).start()
  28. t3 = Thread(target=eat1,args=(noodle_lock,fork_lock,'yuan')).start()
  29. t4 = Thread(target=eat2,args=(noodle_lock,fork_lock,'alex')).start()

执行输出:

nazha 抢到了面条

nazha 抢到了叉子

nazha 正在吃面

nazha 归还了叉子

nazha 归还了面条

egon 抢到了叉子

yuan 抢到了面条

发现程序卡住了,为啥呢?

egon 和 yuan 分别抢到了叉子和面条,但是谁也不愿意给对方。互掐在…

那么就出现了死锁现象。

只有一个锁,不会出现死锁。在多个锁的情况下,就会出现死锁

如何解决这个问题呢?使用递归锁

递归锁

在 Python 中为了支持在同一线程中多次请求同一资源,python 提供了可重入锁 RLock。

这个 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源

简单例子:

  1. from threading import Thread,RLock
  2. rlock = RLock() # 创建递归锁
  3. rlock.acquire() # 加第一个锁
  4. print('***')
  5. rlock.acquire() # 加第二个锁
  6. print('***')

执行输出:



发现瞬间就输出了,程序没有卡顿,666

结论:

递归锁在同一个线程中对同一个锁多次 acquire 不会产生阻塞

看下面的例子

  1. from threading import Thread,RLock
  2. def func(rlock,flag):
  3. rlock.acquire() # 第一道锁
  4. print(flag*10)
  5. rlock.acquire() # 第二道锁
  6. print(flag * 10)
  7. rlock.acquire() # 第三道锁
  8. print(flag * 10)
  9. rlock.acquire() # 第四道锁
  10. print(flag * 10)
  11. rlock = RLock() # 创建递归锁
  12. Thread(target=func,args=(rlock,'*')).start() # 传入递归锁和*
  13. Thread(target=func,args=(rlock,'-')).start()

执行输出:





发现程序卡住了,————-没有输出

为什么?递归锁搞不定?

看下图

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图6

第一个线程过来,拿走了一串钥匙

每次 acquire,就进入一个房间

最后在房间的最里面,没出来。

第二个线程,怎么进入房间呢?

需要第一个线程执行 release 操作,从最里面的房间走出来。

它每走出一个房间,需要 release 一次。将钥匙放到钥匙串上面,归还到门口。

由第二个线程拿走钥匙串才行。

它跟递归函数类似

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图7

怎么解决上面的卡住的问题呢?

必须是 4 次 relase

  1. from threading import Thread,RLock
  2. def func(rlock,flag):
  3. rlock.acquire() # 第一道锁
  4. print(flag*10)
  5. rlock.acquire() # 第二道锁
  6. print(flag * 10)
  7. rlock.acquire() # 第三道锁
  8. print(flag * 10)
  9. rlock.acquire() # 第四道锁
  10. print(flag * 10)
  11. rlock.release() # 解锁
  12. rlock.release()
  13. rlock.release()
  14. rlock.release()
  15. rlock = RLock() # 创建递归锁
  16. Thread(target=func,args=(rlock,'*')).start()
  17. Thread(target=func,args=(rlock,'-')).start()

执行输出:









使用递归锁解决科学家吃面的问题

  1. import time
  2. from threading import Thread,RLock
  3. def eat1(noodle_lock,fork_lock,name): # 吃面
  4. noodle_lock.acquire() # 拿到面条的整串钥匙
  5. print('%s 抢到了面条'%name)
  6. fork_lock.acquire() # 拿到叉子的整串钥匙
  7. print('%s 抢到了叉子'%name)
  8. print('%s 正在吃面'%name)
  9. fork_lock.release() # 叉子解锁
  10. print('%s归还了叉子'%name)
  11. noodle_lock.release() # 面条解锁
  12. print('%s归还了面条' % name)
  13. def eat2(noodle_lock,fork_lock,name): #也是吃面,不同的是:先抢叉子,再抢面
  14. fork_lock.acquire()
  15. print('%s抢到了叉子' % name)
  16. time.sleep(0.5)
  17. noodle_lock.acquire()
  18. print('%s抢到了面'%name)
  19. print('%s正在吃面'%name)
  20. noodle_lock.release()
  21. print('%s归还了面' % name)
  22. fork_lock.release()
  23. print('%s归还了叉子' % name)
  24. noodle_lock = fork_lock = RLock() # 面条锁和叉子锁,表示一串钥匙
  25. t1 = Thread(target=eat1,args=(noodle_lock,fork_lock,'nazha')).start()
  26. t2 = Thread(target=eat2,args=(noodle_lock,fork_lock,'egon')).start()
  27. t3 = Thread(target=eat1,args=(noodle_lock,fork_lock,'yuan')).start()
  28. t4 = Thread(target=eat2,args=(noodle_lock,fork_lock,'alex')).start()

执行输出:

nazha 抢到了面条

nazha 抢到了叉子

nazha 正在吃面

nazha 归还了叉子

nazha 归还了面条

egon 抢到了叉子

egon 抢到了面

egon 正在吃面

egon 归还了面

egon 归还了叉子

yuan 抢到了面条

yuan 抢到了叉子

yuan 正在吃面

yuan 归还了叉子

yuan 归还了面条

alex 抢到了叉子

alex 抢到了面

alex 正在吃面

alex 归还了面

alex 归还了叉子

4 个人可以欢快的吃面了

什么情况下,需要用到递归锁呢?

有超过一个资源需要锁的时候,使用递归锁

有 2 个屌丝,一个拿着外层钥匙,一个拿着里层钥匙,谁也不想给对方

就出现了死锁

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图8

四、信号量

同进程的一样

Semaphore 管理一个内置的计数器,

每当调用 acquire()时内置计数器-1;

调用 release() 时内置计数器+1;

计数器不能小于 0;当计数器为 0 时,acquire()将阻塞线程直到其他线程调用 release()。

实例:(同时只有 5 个线程可以获得 semaphore,即可以限制最大连接数为 5):

  1. import time
  2. from threading import Thread,Semaphore
  3. def func(sem,i):
  4. sem.acquire()
  5. print(i)
  6. time.sleep(1)
  7. sem.release()
  8. sem = Semaphore(5)
  9. for i in range(6):
  10. Thread(target=func,args=(sem,i)).start()

执行输出:

0

1

2

3

4

5

池与信号量

  1. 与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

五、事件

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading 库中的 Event 对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event 对象中的信号标志被设置为假。如果有线程等待一个 Event 对象, 而这个 Event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 Event 对象的信号标志设置为真,它将唤醒所有等待这个 Event 对象的线程。如果一个线程等待一个已经被设置为真的 Event 对象,那么它将忽略这个事件, 继续执行

  1. event.isSet():返回event的状态值;
  2. event.wait():如果 event.isSet()==False将阻塞线程;
  3. event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
  4. event.clear():恢复event的状态值为False

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图9

例如,有多个工作线程尝试链接 MySQL,我们想要在链接前确保 MySQL 服务正常才让那些工作线程去连接 MySQL 服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用 threading.Event 机制来协调各个工作线程的连接操作

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图10

连接数据库

  1. import time
  2. import random
  3. from threading import Event,Thread
  4. # 连接数据库
  5. def connect_db(e):
  6. count = 1 # 初始计数器
  7. while count < 4:
  8. print('尝试第%s次检测连接'%count)
  9. e.wait(0.5) # 等待0.5,再去执行下面的代码
  10. # 如果不传参数会一直等到事件为True为止
  11. # 如果传参数 传一个时间参数,到时间后,
  12. count += 1 # 加1
  13. if e.is_set(): # 判断状态是否为True
  14. print('连接成功')
  15. break
  16. else:
  17. print('连接失败')
  18. def check_conn(e):
  19. '''检测数据库是否可以连接'''
  20. time.sleep(random.randint(1,2)) # 等待1~2秒
  21. e.set() # 设置状态为True
  22. e = Event()
  23. Thread(target=check_conn,args=(e,)).start()
  24. Thread(target=connect_db,args=(e,)).start()

执行输出:

尝试第 1 次检测连接

尝试第 2 次检测连接

连接成功

结果是不固定的。有 3 种情况

  1. check_conn 等待 1 秒,尝试 2 次,连接成功

  2. check_conn 等待 1 秒,尝试 3 次,连接成功

  3. check_conn 等待 2 秒,尝试 3 次,连接失败

第 3 种情况是必然的,为什么呢?

如果等待 2 秒,e.wait(0.5)执行 4 次,才能达到 2 秒。但是 whlie 循环,不可能达到 4。所以 3 次全部失败。

第 1 种情况,等待 1 秒,e.wait(0.5)必然执行 2 次,才能达到 1 秒,状态变成 True,输出连接成功。

第 2 种情况,是偶然事件。因为 CPU 同一时刻只能执行一个线程。所以失败了 3 次,才输出连接成功

什么时候,用到事件?

你要做一件事情 是有前提的

你就先去处理前提的问题 —— 前提处理好了 把状态设置成True

来控制即将要做的事情可以开始

条件

使得线程等待,只有满足某条件时,才释放 n 个线程

  1. Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquirerelease方法外,还提供了waitnotify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

起 10 个线程

  1. from threading import Condition,Thread
  2. # acquire release
  3. # notify -- wait的行为
  4. # 10线程 wait
  5. # notify(1)
  6. def func(i,con):
  7. con.acquire() # 进入锁定池
  8. con.wait() # 等待通知,前后必须要有acquire和release
  9. print(i*'*')
  10. con.release() # 解锁
  11. con = Condition() #条件变量
  12. for i in range(10):
  13. Thread(target=func,args=(i,con)).start()
  14. while True:
  15. n = int(input('>>>'))
  16. con.acquire() # 加锁
  17. con.notify(n) # 通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件,解除wait状态。前后必须要有acquire和release
  18. con.release() # 解锁

执行输出:

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图11

首先输入 2,显示 1 个*和 0 个星号,也就是空。

再输入 5,分别显示 2~6 个*号

为什么输入 5 的时候,出现 6 个*号?

因为线程池有 10 个,不等长度的*号

输入 2,表示解除 2 个线程的 wait 状态,分别表示 0,1。

那么 5,表示 5 个。分别表示 2,3,4,5,6

从结果上来看,发生数据混乱

con.notify(n) 表示按照要求放开线程

con.notify_all() 表示一次性放开线程

总结:

semaphore 允许同一时刻 n 个线程执行这段代码

event 有一个内部的事件来控制 wait 的行为,控制的是所有的线程

condition 有一个内部的条件来控制 wait 的行为,它可以逐个或者分批次的控制线程的走向

六、定时器

定时器,指定 n 秒后执行某个操作

  1. import time
  2. from threading import Timer
  3. def func():
  4. print('*'*10)
  5. print('子线程',time.strftime("%Y-%m-%d %H:%M:%S"))
  6. t =Timer(5,func) # 要开始一个线程,等到5秒之后才开启并执行
  7. t.start()
  8. print('主进程',time.strftime("%Y-%m-%d %H:%M:%S"))

执行输出:

主进程 2018-05-17 19:41:08


子线程 2018-05-17 19:41:13

虽然可以用 time.sleep(5)完成 5 秒的过程,但是它会阻塞主线程

如果用 timer,就不会,它是异步的。

七、线程队列

queue 队列 :使用 import queue,用法与进程 Queue 一样

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先进先出

  1. import queue
  2. q=queue.Queue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())
  7. print(q.get())
  8. print(q.get())
  9. '''
  10. 结果(先进先出):
  11. first
  12. second
  13. third
  14. '''

class **queue.LifoQueue(maxsize=0) #last in fisrt out**

  1. import queue
  2. q=queue.LifoQueue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())
  7. print(q.get())
  8. print(q.get())
  9. '''
  10. 结果(后进先出):
  11. third
  12. second
  13. first
  14. '''

class **queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列**

优先级队列

  1. import queue
  2. q=queue.PriorityQueue()
  3. #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
  4. q.put((20,'a'))
  5. q.put((10,'b'))
  6. q.put((30,'c'))
  7. print(q.get())
  8. print(q.get())
  9. print(q.get())
  10. '''
  11. 结果(数字越小优先级越高,优先级高的优先出队):
  12. (10, 'b')
  13. (20, 'a')
  14. (30, 'c')
  15. '''

更多方法说明

  1. Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
  2. The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
  3. exception queue.Empty
  4. Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
  5. exception queue.Full
  6. Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
  7. Queue.qsize()
  8. Queue.empty() #return True if empty
  9. Queue.full() # return True if full
  10. Queue.put(item, block=True, timeout=None)
  11. Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
  12. Queue.put_nowait(item)
  13. Equivalent to put(item, False).
  14. Queue.get(block=True, timeout=None)
  15. Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
  16. Queue.get_nowait()
  17. Equivalent to get(False).
  18. Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
  19. Queue.task_done()
  20. Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
  21. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
  22. Raises a ValueError if called more times than there were items placed in the queue.
  23. Queue.join() block直到queue被消费完毕

后进先出

  1. import queue
  2. q = queue.LifoQueue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())

执行输出:third

先进先出

  1. import queue
  2. q = queue.Queue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())

执行输出:third

优先级队列

  1. import queue
  2. q = queue.PriorityQueue()
  3. q.put((1,'a'))
  4. q.put((2,'z'))
  5. q.put((3,'b'))
  6. print(q.get())

执行输出:(1, ‘a’)

数字越小,优先级越高

有一个场景,你冲了 VIP,你就先买到票

如果有 2 个人,优先级一样呢?

  1. import queue
  2. q = queue.PriorityQueue()
  3. q.put((2,'z'))
  4. q.put((2,'a'))
  5. print(q.get())

执行输出:

(2, ‘a’)

结果是根据 ascii 码的顺序来排序的

八、Python 标准模块—concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

  1. #1 介绍
  2. concurrent.futures模块提供了高度封装的异步调用接口
  3. ThreadPoolExecutor:线程池,提供异步调用
  4. ProcessPoolExecutor: 进程池,提供异步调用
  5. Both implement the same interface, which is defined by the abstract Executor class.
  6. #2 基本方法
  7. #submit(fn, *args, **kwargs)
  8. 异步提交任务
  9. #map(func, *iterables, timeout=None, chunksize=1)
  10. 取代for循环submit的操作
  11. #shutdown(wait=True)
  12. 相当于进程池的pool.close()+pool.join()操作
  13. wait=True,等待池内所有任务执行完毕回收完资源后才继续
  14. wait=False,立即返回,并不会等待池内的任务执行完毕
  15. 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  16. submitmap必须在shutdown之前
  17. #result(timeout=None)
  18. 取得结果
  19. #add_done_callback(fn)
  20. 回调函数

ProcessPoolExecutor

  1. #介绍
  2. The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
  3. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
  4. An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
  5. #用法
  6. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  7. import os,time,random
  8. def task(n):
  9. print('%s is runing' %os.getpid())
  10. time.sleep(random.randint(1,3))
  11. return n**2
  12. if __name__ == '__main__':
  13. executor=ProcessPoolExecutor(max_workers=3)
  14. futures=[]
  15. for i in range(11):
  16. future=executor.submit(task,i)
  17. futures.append(future)
  18. executor.shutdown(True)
  19. print('+++>')
  20. for future in futures:
  21. print(future.result())

ThreadPoolExecutor

  1. #介绍
  2. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
  3. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
  4. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
  5. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
  6. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
  7. #用法
  8. ProcessPoolExecutor相同

map的用法

  1. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  2. import os,time,random
  3. def task(n):
  4. print('%s is runing' %os.getpid())
  5. time.sleep(random.randint(1,3))
  6. return n**2
  7. if __name__ == '__main__':
  8. executor=ThreadPoolExecutor(max_workers=3)
  9. # for i in range(11):
  10. # future=executor.submit(task,i)
  11. executor.map(task,range(1,12)) #map取代了for+submit

回调函数

  1. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  2. from multiprocessing import Pool
  3. import requests
  4. import json
  5. import os
  6. def get_page(url):
  7. print('<进程%s> get %s' %(os.getpid(),url))
  8. respone=requests.get(url)
  9. if respone.status_code == 200:
  10. return {'url':url,'text':respone.text}
  11. def parse_page(res):
  12. res=res.result()
  13. print('<进程%s> parse %s' %(os.getpid(),res['url']))
  14. parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
  15. with open('db.txt','a') as f:
  16. f.write(parse_res)
  17. if __name__ == '__main__':
  18. urls=[
  19. 'https://www.baidu.com',
  20. 'https://www.python.org',
  21. 'https://www.openstack.org',
  22. 'https://help.github.com/',
  23. 'http://www.sina.com.cn/'
  24. ]
  25. # p=Pool(3)
  26. # for url in urls:
  27. # p.apply_async(get_page,args=(url,),callback=pasrse_page)
  28. # p.close()
  29. # p.join()
  30. p=ProcessPoolExecutor(3)
  31. for url in urls:
  32. p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. thread_pool = ThreadPoolExecutor(5) #创建一个最大可容纳2个任务的线程池
  7. thread_pool.submit(func,1) # 异步提交任务,往线程池里面加入一个任务

执行输出:

*

添加 10 个任务

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. thread_pool = ThreadPoolExecutor(5) #创建一个最大可容纳2个任务的线程池
  7. for i in range(6):
  8. thread_pool.submit(func,i) # 异步提交任务,往线程池里面加入一个任务
  9. print('wahaha')

执行输出:

*

**



wahaha


thread_pool.submit 相当于之前线程的 apply_async,表示异步

由于等到了 1 秒,所以优先输出 wahaha

如果想最后打印 wahaha 呢?

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. thread_pool = ThreadPoolExecutor(5) #创建一个最大可容纳2个任务的线程池
  7. for i in range(6):
  8. thread_pool.submit(func,i) # 异步提交任务,往线程池里面加入一个任务
  9. thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
  10. print('wahaha')

执行输出:

*

**




wahaha

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. return i**2
  7. thread_pool = ThreadPoolExecutor(5) #创建一个最大可容纳2个任务的线程池
  8. ret_lst = []
  9. for i in range(6):
  10. ret = thread_pool.submit(func,i) # 异步提交任务,往线程池里面加入一个任务
  11. ret_lst.append(ret)
  12. thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
  13. for ret in ret_lst:
  14. print(ret.result()) # 取得结果
  15. print('wahaha')

执行输出:

*

**




0

1

4

9

16

25

wahaha

到目前为止和 ThreadPool 的区别

Day42 Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures - 图12

回调函数

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. return i**2
  7. def callback(arg): #回调函数
  8. print(arg.result()*'-') # 取得结果,并乘以-
  9. thread_pool = ThreadPoolExecutor(5) #创建一个最大可容纳2个任务的线程池
  10. for i in range(6):
  11. ret = thread_pool.submit(func,i).add_done_callback(callback) # 异步提交任务,执行回调函数
  12. thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
  13. print('wahaha')

执行输出:

*

**






-



wahaha

未来建议使用 concurrent 里面的 pooll

做效率测试,比较方便

什么情况下,使用进程池/线程池 ?

当内存不需要共享,且高计算的时候 用进程

当内存需要共享,且高 IO 的时候 用线程

当并发很大的时候

多进程 : 多个任务 —— 进程池 :cpu个数、cpu个数+1

多线程 :多个任务 —— 线程池 :cpu个数*5

4 核 CPU,可以开 4~5 个进程,开 20 条线程/进程。最终可以开 80~100 个任务

最美的程序,不是单一使用。而是复合使用

比如开多个进程,每个进程,开多个线程。

当后面学到协程,并发(qps)能达到 5 万

锁的概念,面试,会经常问到,还有队列

真正工作过程中,锁会用到,其他的,很少用。

还有进程池/线程池也会用到

明日默写:

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  3. def func(i):
  4. print(i*'*')
  5. time.sleep(1)
  6. return i**2
  7. def callb(arg):
  8. print(arg.result()*'-')
  9. if __name__ == '__main__':
  10. thread_pool = ThreadPoolExecutor(5)
  11. for i in range(10):
  12. thread_pool.submit(func,i).add_done_callback(callb) # 相当于apply_async
  13. thread_pool.shutdown() # close+join
  14. print('wahaha')