昨日内容回顾

python 中启动子进程

并发编程

并发 :多段程序看起来是同时运行的

ftp 网盘

不支持并发

socketserver 多进程 并发

异步

两个进程 分别做不同的事情

创建新进程

join:阻塞 直到 子进程结束

守护进程 daemon:子(守护)进程随着主进程代码的结束而结束

进程之间数据隔离

使用类来开启一个进程 :自定义类 继承 Process 重写 run 方法 传参数需要重写 init

属性 pid name

方法 terminate is_alive

作业讲解:

socket 聊天并发实例,使用原生 socket 的 TCP 协议,实现一个聊天的并发实例

先来一个简单的,单线程

server.py

  1. import socket
  2. sk = socket.socket()
  3. sk.bind(('127.0.0.1',9000))
  4. sk.listen()
  5. conn,addr = sk.accept()
  6. msg = conn.recv(1024).decode('utf-8')
  7. conn.send((msg+'sb').encode('utf-8'))
  8. print(msg)
  9. conn.close()

client.py

  1. import socket
  2. sk = socket.socket()
  3. sk.connect(('127.0.0.1',9000))
  4. inp = input('>>>').encode('utf-8')
  5. sk.send(inp)
  6. msg = sk.recv(1024).decode('utf-8')
  7. print(msg)
  8. sk.close()

先执行 server.py,再执行 client.py

输出:

111

111sb

如果要多次会话呢?加一个 while 循环

server.py

  1. import socket
  2. sk = socket.socket()
  3. sk.bind(('127.0.0.1',9000))
  4. sk.listen()
  5. conn,addr = sk.accept()
  6. while True:
  7. msg = conn.recv(1024).decode('utf-8')
  8. conn.send((msg+'sb').encode('utf-8'))
  9. print(msg)
  10. conn.close()
  11. sk.close()

client.py

  1. import socket
  2. sk = socket.socket()
  3. sk.connect(('127.0.0.1',9000))
  4. while True:
  5. inp = input('>>>').encode('utf-8')
  6. sk.send(inp)
  7. msg = sk.recv(1024).decode('utf-8')
  8. print(msg)
  9. sk.close()

但是这样,只能一个客户端和服务器连接,再来一个客户端,就卡住了。

sk.accept()是阻塞的,那么,如何改成异步呢?

将 server 里面的 recv 和 send 放到一个函数里面,使用多线程调用

server.py

  1. import socket
  2. from multiprocessing import Process
  3. def chat(conn): # 聊天
  4. while True:
  5. msg = conn.recv(1024).decode('utf-8')
  6. print(msg)
  7. conn.send((msg + '_sb').encode('utf-8'))
  8. conn.close()
  9. if __name__ == '__main__':
  10. sk = socket.socket()
  11. sk.bind(('127.0.0.1',9000))
  12. sk.listen()
  13. while True:
  14. conn,addr = sk.accept()
  15. Process(target=chat,args=[conn,]).start() # 异步
  16. sk.close()

先执行 server.py,再执行 client.py

多开几个客户端,也没问题

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952378-c1818e02-f8a5-4b1e-b78e-4b8366932b7e.png)

这就有点像接线员一样,接到客户打的电话,转给相应的人处理

这样,就有多个子进程,同时执行,实现多个客户端同时连接

进程同步(multiprocess.Lock、Semaphore、Event)

锁 —— multiprocess.Lock

通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用 IO 资源,但是也给我们带来了新的问题。

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

多进程抢占输出资源

  1. import os
  2. import time
  3. import random
  4. from multiprocessing import Process
  5. def work(n):
  6. print('%s: %s is running' %(n,os.getpid()))
  7. time.sleep(random.random())
  8. print('%s:%s is done' %(n,os.getpid()))
  9. if __name__ == '__main__':
  10. for i in range(3):
  11. p=Process(target=work,args=(i,))
  12. p.start()

执行输出:

1: 16548 is running

0: 1448 is running

2: 1096 is running

2:1096 is done

0:1448 is done

1:16548 is done

看输出结果,都是乱的。

如果想有序的执行,先 run,再 done,怎么办?

需要用到锁

  1. import os
  2. import time
  3. import random
  4. from multiprocessing import Lock
  5. from multiprocessing import Process
  6. def work(n,lock):
  7. lock.acquire() #取得锁
  8. print('%s: %s is running' %(n,os.getpid()))
  9. time.sleep(random.random())
  10. print('%s:%s is done' %(n,os.getpid()))
  11. lock.release() #释放锁
  12. if __name__ == '__main__':
  13. lock = Lock() #创建锁
  14. for i in range(5):
  15. p=Process(target=work,args=(i,lock))
  16. p.start()

执行输出:

0: 17468 is running

0:17468 is done

2: 16688 is running

2:16688 is done

1: 15984 is running

1:15984 is done

3: 15828 is running

3:15828 is done

4: 18156 is running

4:18156 is done

从结果上来看,结果就比较整齐了。先 run,再 done。结果是无序的

acquire参数

  1. #创建锁
  2. mutex = threading.Lock()
  3. #锁定
  4. mutex.acquire([timeout])#timeout是超时时间
  5. #释放
  6. mutex.release()
  7. 其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。

锁的理论

acquire 表示拿锁

最先拿到钥匙的,它会做几件事情:

1.用钥匙开门

2.把钥匙带进门里

3.把门反锁

看下图

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952420-1e225c2c-1a1b-44d8-bab4-231cedab1536.png)

理论上来讲,进程一般是异步的

但是加了锁之后,就变成同步了

那么谁先拿到钥匙呢?满足以下 2 个条件:

1.操作系统先响应的进程

2.当时没有时间片轮询,刚好就是它

lock.acquire()和 lock.release()之间的代码,表示被锁住了。

看 join 效果

  1. import os
  2. import time
  3. import random
  4. from multiprocessing import Process
  5. def work(n):
  6. print('%s: %s is running' %(n,os.getpid()))
  7. time.sleep(random.random())
  8. print('%s:%s is done' %(n,os.getpid()))
  9. if __name__ == '__main__':
  10. for i in range(5):
  11. p=Process(target=work,args=(i,))
  12. p.start()
  13. p.join()

执行输出:

0: 17348 is running

0:17348 is done

1: 18164 is running

1:18164 is done

2: 18160 is running

2:18160 is done

3: 17652 is running

3:17652 is done

4: 8340 is running

4:8340 is done

如果锁加在进程的开始和结束,参考 work 函数,它是同步的

join 也是同步的,但是,他们之间的区别在于

锁执行时是无序的,join 是有序的

由并发变成了串行,牺牲了运行效率,但避免了竞争。

上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。

总结:

同步控制:

只要用到了锁 锁之间的代码就会变成同步的

锁 :控制一段代码 同一时间 只能被一个进程执行

抢票的例子:

比如 12306,大家都经历过

模拟数据库,创建一个文件 ticket,内容如下:

  1. {"count":1}

注意一定要用双引号,不然 json 无法识别

主程序代码如下:

  1. import json
  2. from multiprocessing import Process
  3. def check_ticket(i):
  4. with open('ticket') as f:
  5. ticket_count = json.load(f)
  6. print('person%s'%i,ticket_count['count'])
  7. if __name__ == '__main__':
  8. for i in range(5):
  9. Process(target=check_ticket,args=(i,)).start()

执行输出:

person1 1

person0 1

person3 1

person2 1

person4 1

开始买票,买票的时候,可能有网络延时

收到请求之后,从数据库中读取数据

当你发票还有余票时,把票减少这件事情记录下来

中间会经历网络延时

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952489-fc570ad1-13c7-4686-b3cd-2186c7628eac.png)
  1. import json
  2. import time
  3. import random
  4. from multiprocessing import Process
  5. def buy_ticket(i): # 购票
  6. with open('ticket') as f: # 读取文件
  7. tick_count = json.load(f) # 反序列化
  8. time.sleep(random.random()) # 读取延时
  9. if tick_count['count'] > 0: # 当余票小于0时
  10. print('person%s购票成功'%i)
  11. tick_count['count'] -= 1 # 票数减1
  12. else:
  13. print('余票不足,person%s购票失败'%i)
  14. time.sleep(random.random()) # 写入延时
  15. with open('ticket','w') as f:
  16. json.dump(tick_count,f) # 写入文件
  17. if __name__ == '__main__':
  18. for i in range(5):
  19. Process(target=buy_ticket,args=(i,)).start()

模拟这 2 次延时

是因为服务器和数据库不在一台机器上面,它们之间交互数据,必然有延时

上面的代码只模拟了读取延时和写入延时,没有模拟请求延时

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952433-d1bacd05-20f6-4059-b2b8-749405dd241e.png)

执行输出:

person3 购票成功

person0 购票成功

person4 购票成功

person1 购票成功

person2 购票成功

从输出结果上来看,库存只有 1 张票,但是 5 个人都购票成功了。这显然是不合理的!

这就造成了数据不安全

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952378-395cacea-6d58-4b85-b7d4-dccca6afc08b.png)

怎么解决呢?加锁

  1. import json
  2. import time
  3. import random
  4. from multiprocessing import Process,Lock
  5. def buy_ticket(i,lock): # 购票
  6. lock.acquire() #取得锁
  7. with open('ticket') as f: # 读取文件
  8. tick_count = json.load(f) # 反序列化
  9. time.sleep(random.random()) # 读取延时
  10. if tick_count['count'] > 0: # 当余票小于0时
  11. print('person%s购票成功'%i)
  12. tick_count['count'] -= 1 # 票数减1
  13. else:
  14. print('余票不足,person%s购票失败'%i)
  15. time.sleep(random.random()) # 写入延时
  16. with open('ticket','w') as f:
  17. json.dump(tick_count,f) # 写入文件
  18. lock.release() #释放锁
  19. if __name__ == '__main__':
  20. lock = Lock() #创建锁
  21. for i in range(5): # 模拟5个用户抢票
  22. Process(target=buy_ticket,args=(i,lock)).start()

将文件 ticket 的 count 数字改为 1

  1. {"count": 1}

执行程序输出:

person0 购票成功

余票不足,person1 购票失败

余票不足,person2 购票失败

余票不足,person3 购票失败

从结果上来看,是正确的。保证了数据的安全性。

查数据,不涉及数据安全,因为没有修改。

总结:

  1. #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
  2. 虽然可以用文件共享数据实现进程间通信,但问题是:
  3. 1.效率低(共享数据基于文件,而文件是硬盘上的数据)
  4. 2.需要自己加锁处理
  5. #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
  6. 队列和管道都是将数据存放于内存中
  7. 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
  8. 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

信号量 —— multiprocess.Semaphore(了解)

信号量介绍Semaphore

  1. 互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据
  2. 假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
  3. 实现:
  4. 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
  5. 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

上面讲的锁一个,现在锁变成一串了,数量由你来控制

下面有一个小 KTV,只能容纳 4 个人,第 5 个人,就没有钥匙了

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952589-e421b0d2-38b5-4cae-b265-083d96bbc3e9.png)

先不用信号量

  1. #KTV 4个人
  2. import time
  3. import random
  4. from multiprocessing import Process,Semaphore
  5. def ktv(i):
  6. print('person %s 进来唱歌了'%i)
  7. time.sleep(random.randint(1,5))
  8. print('person %s 从ktv出去了'%i)
  9. if __name__ == '__main__':
  10. for i in range(6): # 模拟6个人
  11. Process(target=ktv,args=(i,)).start()

执行输出:

person 0 进来唱歌了

person 1 进来唱歌了

person 2 进来唱歌了

person 3 进来唱歌了

person 4 进来唱歌了

person 5 进来唱歌了

person 3 从 ktv 出去了

person 4 从 ktv 出去了

person 2 从 ktv 出去了

person 1 从 ktv 出去了

person 0 从 ktv 出去了

person 5 从 ktv 出去了

结果是有问题的,6 个人,都可以进去

使用信号量来实现

  1. #KTV 4个人
  2. import time
  3. import random
  4. from multiprocessing import Process,Semaphore
  5. def ktv(i,sem):
  6. sem.acquire() #取得锁
  7. print('person %s 进来唱歌了'%i)
  8. time.sleep(random.randint(1,5))
  9. print('person %s 从ktv出去了'%i)
  10. sem.release() #释放锁
  11. if __name__ == '__main__':
  12. sem = Semaphore(4) #初始化信号量,数量为4
  13. for i in range(6): # 模拟6个人
  14. Process(target=ktv,args=(i,sem)).start()

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952362-2f991e8a-b424-4122-aa1b-36e66360f642.png)

在同一时间,最多有 4 个人进去

acquire()是一个阻塞行为

信号量和锁有点类似

那么它们之间的区别在于:

信号量,相当于计数器

它是锁+计数器

调用 acquire() 计数器-1

当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用 release()

调用 release() 计数器+1

事件 —— multiprocess.Event(了解)

时间介绍

  1. python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 setwaitclear
  2. 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
  3. clear:将“Flag”设置为False
  4. set:将“Flag”设置为True

著名的计算机模型:红绿灯

车 比方是一个进程,wait() 等红灯

根据状态变化,wait 遇到 true 信号,就非阻塞

遇到 False,就阻塞

交通灯 也是有一个进程 红灯->False 绿灯->True

这里没有黄灯

事件有几个方法:

wait 的方法 根据一个状态来决定自己是否要阻塞

状态相关的方法

set 将状态改为 True

clear 将状态改为 False

is_set 判断当前的状态是否为 True

先来看几行代码

  1. from multiprocessing import Event
  2. e = Event() #创建一个事件的对象
  3. print(e.is_set()) # 在事件的创世之初,状态为False

执行输出:False

在看一个列子

  1. from multiprocessing import Event
  2. e = Event() #创建一个事件的对象
  3. print(e.is_set()) # 在事件的创世之初,状态为False
  4. e.wait()
  5. print('1')

执行输出:False,然后程序一致卡着

为啥呢?因为状态值为 False,那么当程序执行 event.wait 方法时就会阻塞

再来

  1. from multiprocessing import Event
  2. e = Event() #创建一个事件的对象
  3. print(e.is_set()) # 在事件的创世之初,状态为False
  4. e.set() # 将状态设置为True
  5. e.wait()
  6. print(e.is_set()) # 查看状态
  7. print('1')

执行输出:

False

True

1

很快就输出了,说明没有阻塞

wait 是否阻塞,取决于当前的状态

模拟红绿灯

  1. import time
  2. def traffic_light():
  3. while True:
  4. print('红灯亮')
  5. time.sleep(2)
  6. print('绿灯亮')
  7. time.sleep(2)
  8. break
  9. traffic_light()

执行输出:

红灯亮

绿灯亮

为了好看一点,加点颜色

  1. import time
  2. def traffic_light():
  3. while True:
  4. print('\033[1;31m红灯亮\033[0m')
  5. time.sleep(2)
  6. print('\033[1;32m绿灯亮\033[0m')
  7. time.sleep(2)
  8. break
  9. traffic_light()

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952382-25b81e36-4558-470d-bb64-98d073ed03ac.png)

输出挺专业的哈

下面来创建车

  1. import time
  2. from multiprocessing import Process,Event
  3. def traffic_light():
  4. while True:
  5. print('\033[1;31m红灯亮\033[0m')
  6. time.sleep(2)
  7. print('\033[1;32m绿灯亮\033[0m')
  8. time.sleep(2)
  9. break
  10. def car(i):
  11. print('car%s通过路口'%i)
  12. if __name__ == '__main__':
  13. for i in range(1,6): # 创建5辆车
  14. Process(target=car,args=(i,)).start()

执行输出:

car1 通过路口

car2 通过路口

car3 通过路口

car5 通过路口

car4 通过路口

现在还没有红绿灯,加一个红绿灯进程

  1. import time
  2. from multiprocessing import Process,Event
  3. def traffic_light():
  4. while True:
  5. print('\033[1;31m红灯亮\033[0m')
  6. time.sleep(2)
  7. print('\033[1;32m绿灯亮\033[0m')
  8. time.sleep(2)
  9. break
  10. def car(i):
  11. print('car%s通过路口'%i)
  12. if __name__ == '__main__':
  13. Process(target=traffic_light).start()
  14. for i in range(1,6):
  15. Process(target=car,args=(i,)).start()

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952345-7022f1b4-bb48-459c-8ac7-3cc5c39569d8.png)

发现 5 辆车都闯红灯了…

加一个事件:

  1. import time
  2. import random
  3. from multiprocessing import Process,Event
  4. def traffic_light(e,count): # 交通灯
  5. while True:
  6. while True:
  7. # 事件在创建的时候,e的状态是False,相当于程序中的红灯
  8. print('\033[1;31m红灯亮\033[0m')
  9. time.sleep(2) # 红灯亮2秒
  10. # 这里e.is_set()是False,所以not e.is_set()就是True
  11. if not e.is_set():e.set() # 判断为True时,变绿灯
  12. print('\033[1;32m绿灯亮\033[0m')
  13. time.sleep(2) # 绿灯亮2秒
  14. # 这里e.is_set()是True
  15. if e.is_set():e.clear() # 判断为True时,将状态设置为False
  16. count += 1 # 自增1
  17. if count == 5: # 判断为5时,跳出内层循环
  18. break
  19. break #跳出内层循环
  20. def car(i,e): # 汽车,感知状态的变化
  21. if not e.is_set(): # 当前这个事件状态是False
  22. print('car%s正在等待'%i) # 这辆车正在等待通过路口
  23. e.wait() # 阻塞,直到有一个e.set行为。正在等红灯
  24. print('car%s通过路口'%i) # 等待状态为True,才能通过
  25. if __name__ == '__main__':
  26. e = Event() # 创建事件,默认状态为False
  27. Process(target=traffic_light,args=(e,0)).start() # 启动红灯进程
  28. for i in range(1,6): # 模拟5个人
  29. time.sleep(random.randrange(0,5,2)) # 大于等于0且小于5之间的奇数
  30. Process(target=car,args=(i,e)).start() # 启动交通灯进程

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952374-b16a67d2-e414-4b6a-8936-1d231eba033f.png)

从结果上来看,挺完美的。

每辆车独立占用一个进程,交通灯也是一个进程,之前学的进程是隔离的,那么 car 进程为什么能感知到?

是因为进程之间用了 socket 网络通信

内部有一套通讯机制

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952771-39852ca9-abd7-4f56-87bf-1f2b91910c43.png)

注意:e.is_set() 状态不是固定的,每隔 2 秒,会变化一次。

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952442-4d7aa27d-5795-43d3-9933-3722516ee375.png)

控制信号灯的子进程、事件对象 e、car 子进程,都依赖于状态

wait 只判断 True 和 False

也就是红灯停,绿灯行

等待 2 秒之后,就切换一个状态

相当于执行了下面的代码:

  1. flag = False
  2. while True:
  3. if flag is True:
  4. flag = False
  5. else:
  6. flag = True
  7. time.sleep(2)

进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

进程间通信

IPC(Inter-Process Communication)

IPC 说的就是进程之间的通信,它只是缩写而已

队列

概念介绍

之前学的 queue 和现在要提到的 queue 之间的区别

import queue

它能维护一个先进先出的秩序,它不能进行 IPC

from multiprocessing import Queue,Process

能维护一个先进先出的秩序,也能进行 IPC

创建共享的进程队列,Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。

遵循先进先出原则

  1. Queue([maxsize])
  2. 创建共享的进程队列。
  3. 参数 maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
  4. 底层队列使用管道和锁定实现。

方法介绍

  1. Queue([maxsize])
  2. 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
  3. Queue的实例q具有以下方法:
  4. q.get( [ block [ ,timeout ] ] )
  5. 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
  6. q.get_nowait( )
  7. q.get(False)方法。
  8. q.put(item [, block [,timeout ] ] )
  9. item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
  10. q.qsize()
  11. 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
  12. q.empty()
  13. 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
  14. q.full()
  15. 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

其他方法(了解)

  1. q.close()
  2. 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
  3. q.cancel_join_thread()
  4. 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
  5. q.join_thread()
  6. 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

简单用法:

  1. from multiprocessing import Process,Queue
  2. q = Queue() #创建共享的进程队列
  3. q.put(1) # 将一个值放入队列
  4. q.put(2)
  5. q.put('aaa')
  6. print(q.get()) # 返回q中的一个项目

执行输出:1

因为 1 先进去,所以它先出来。

  1. from multiprocessing import Process,Queue
  2. def wahaha(q):
  3. print(q.get())
  4. if __name__ == '__main__':
  5. q = Queue() #创建共享的进程队列
  6. Process(target=wahaha,args=(q,)).start()
  7. q.put(1)

执行输出:1

因为主程序先执行

双向通信

既能取值,也能增加值

  1. import time
  2. from multiprocessing import Process,Queue
  3. def wahaha(q):
  4. print(q.get()) # 取值
  5. q.put('aaa') # 增加一个aaa
  6. if __name__ == '__main__':
  7. q = Queue() #创建共享的进程队列
  8. Process(target=wahaha,args=(q,)).start()
  9. q.put(1)
  10. time.sleep(0.5) # 等待0.5秒,让子程序执行完
  11. print(q.get()) # 取值

执行输出:

1

aaa

单看队列用法

  1. '''
  2. multiprocessing模块支持进程间通信的两种主要形式:管道和队列
  3. 都是基于消息传递实现的,但是队列接口
  4. '''
  5. from multiprocessing import Queue
  6. q=Queue(3)
  7. #put ,get ,put_nowait,get_nowait,full,empty
  8. q.put(3)
  9. q.put(3)
  10. q.put(3)
  11. # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
  12. # 如果队列中的数据一直不被取走,程序就会永远停在这里。
  13. try:
  14. q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
  15. except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
  16. print('队列已经满了')
  17. # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
  18. print(q.full()) #满了
  19. print(q.get())
  20. print(q.get())
  21. print(q.get())
  22. # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
  23. try:
  24. q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
  25. except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
  26. print('队列已经空了')
  27. print(q.empty()) #空了

上面这个例子还没有加入进程通信,只是先来看看队列为我们提供的方法,以及这些方法的使用和现象。

子进程发送数据给父进程

  1. import time
  2. from multiprocessing import Process, Queue
  3. def f(q):
  4. q.put([time.asctime(), 'from Eva', 'hello']) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
  5. if __name__ == '__main__':
  6. q = Queue() #创建一个Queue对象
  7. p = Process(target=f, args=(q,)) #创建一个进程
  8. p.start()
  9. print(q.get())
  10. p.join()

上面是一个 queue 的简单应用,使用队列 q 对象调用 get 函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:

批量生产数据放入队列再批量获取结果 x

  1. import os
  2. import time
  3. import multiprocessing
  4. # 向queue中输入数据的函数
  5. def inputQ(queue):
  6. info = str(os.getpid()) + '(put):' + str(time.asctime())
  7. queue.put(info)
  8. # 向queue中输出数据的函数
  9. def outputQ(queue):
  10. info = queue.get()
  11. print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
  12. # Main
  13. if __name__ == '__main__':
  14. multiprocessing.freeze_support()
  15. record1 = [] # store input processes
  16. record2 = [] # store output processes
  17. queue = multiprocessing.Queue(3)
  18. # 输入进程
  19. for i in range(10):
  20. process = multiprocessing.Process(target=inputQ,args=(queue,))
  21. process.start()
  22. record1.append(process)
  23. # 输出进程
  24. for i in range(10):
  25. process = multiprocessing.Process(target=outputQ,args=(queue,))
  26. process.start()
  27. record2.append(process)
  28. for p in record1:
  29. p.join()
  30. for p in record2:
  31. p.join()

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产者消费者模型

消费者 消费数据 吃包子

生产者 产生数据的人 做包子

假如生产了 10 个包子,但是来了 15 个人,供不应求了。

这就产生了供销矛盾,那怎么解决呢?

增加做包子的人

或者采用同步模式 :做一个包子 卖一包子

再举一个例子:

生产数据 在淘宝买东西 —- 产生消费者行为数据

消费数据 阿里巴巴 —- 即时性要求非常高 必须要快速把用户生产的数据消费完

看下图

笼屉,只能放 100 个包子

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952363-54e3ef4f-feee-4eac-8393-4316c1021bab.png)

如果蒸包子的数量过多,没人买了,那么就需要减少蒸包子的人

如果蒸包子的根据买包子的人,来生产包子,就比较完美了

queue 队列就是笼屉

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952313-fddf8c71-5d98-4bda-90e1-db7e9c62cd83.png)

基于队列实现生产者消费者模型

举例

  1. import time
  2. import random
  3. from multiprocessing import Process,Queue
  4. def producer(q): #生产者
  5. for i in range(5): # 生产5个包子
  6. time.sleep(random.random()) # 模拟生产包子时间
  7. q.put('包子%s'%i)
  8. def consumer(q): # 消费者
  9. for i in range(5): # 5个消费者
  10. print(q.get()) # 买一个包子
  11. time.sleep(random.random()) # 模拟吃包子时间
  12. if __name__ == '__main__':
  13. q = Queue() #创建共享的进程队列
  14. p1 = Process(target=producer,args=(q,))
  15. p2 = Process(target=consumer, args=(q,))
  16. p1.start() # 启动进程
  17. p2.start()

执行输出:

包子 0

包子 1

包子 2

包子 3

包子 4

增加颜色,显示更明显

  1. import time
  2. import random
  3. from multiprocessing import Process,Queue
  4. def producer(q): #生产者
  5. for i in range(1,6): # 生产5个包子
  6. time.sleep(random.random()) # 模拟生产包子时间
  7. print('\033[1;31m生产了包子%s\033[0m' % i)
  8. def consumer(q): # 消费者
  9. for i in range(1,6): # 5个消费者
  10. print('\033[1;32m消费了包子%s\033[0m' % i)
  11. time.sleep(random.uniform(1,2)) # 模拟吃包子时间
  12. if __name__ == '__main__':
  13. q = Queue() #创建共享的进程队列
  14. p1 = Process(target=producer,args=(q,))
  15. p2 = Process(target=consumer, args=(q,))
  16. p1.start() # 启动进程
  17. p2.start()

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952327-3d5eb9ad-4da4-45c2-b7e0-1ff57e7f519e.png)

发现供需不平衡

需要改变供需平衡

  1. import time
  2. import random
  3. from multiprocessing import Process,Queue
  4. def producer(q): #生产者
  5. for i in range(1,11): # 生产5个包子
  6. time.sleep(random.random()) # 模拟生产包子时间
  7. q.put('包子%s'%i) # 生产包子,将包子放入队列,比如包子1
  8. print('\033[1;31m生产了包子%s\033[0m' % i)
  9. def consumer(q): # 消费者
  10. for i in range(1,6): # 5个消费者
  11. food = q.get() # 买一个包子
  12. print('\033[1;32m消费了包子%s\033[0m' % food)
  13. time.sleep(random.uniform(1,2)) # 模拟吃包子时间
  14. if __name__ == '__main__':
  15. q = Queue() #创建共享的进程队列
  16. p1 = Process(target=producer,args=(q,))
  17. p2 = Process(target=consumer, args=(q,))
  18. p3 = Process(target=consumer, args=[q]) # 增加一个消费者进程
  19. p1.start() # 启动进程
  20. p2.start()
  21. p3.start()

执行输出:

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1484428/1597994952351-444d86ee-363a-4d24-aad1-e01cde72fbd4.png)

上面是通过消费者和包子数量对等,解决供需平衡问题的。

其他例子:

基于队列实现生产者消费者模型

  1. from multiprocessing import Process,Queue
  2. import time,random,os
  3. def consumer(q):
  4. while True:
  5. res=q.get()
  6. time.sleep(random.randint(1,3))
  7. print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
  8. def producer(q):
  9. for i in range(10):
  10. time.sleep(random.randint(1,3))
  11. res='包子%s' %i
  12. q.put(res)
  13. print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
  14. if __name__ == '__main__':
  15. q=Queue()
  16. #生产者们:即厨师们
  17. p1=Process(target=producer,args=(q,))
  18. #消费者们:即吃货们
  19. c1=Process(target=consumer,args=(q,))
  20. #开始
  21. p1.start()
  22. c1.start()
  23. print('主')

此时的问题是主进程永远不会结束,原因是:生产者 p 在生产完后就结束了,但是消费者 c 在取空了 q 之后,则一直处于死循环中且卡在 q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以 break 出死循环。

改良版——生产者消费者模型

  1. from multiprocessing import Process,Queue
  2. import time,random,os
  3. def consumer(q):
  4. while True:
  5. res=q.get()
  6. if res is None:break #收到结束信号则结束
  7. time.sleep(random.randint(1,3))
  8. print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
  9. def producer(q):
  10. for i in range(10):
  11. time.sleep(random.randint(1,3))
  12. res='包子%s' %i
  13. q.put(res)
  14. print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
  15. q.put(None) #发送结束信号
  16. if __name__ == '__main__':
  17. q=Queue()
  18. #生产者们:即厨师们
  19. p1=Process(target=producer,args=(q,))
  20. #消费者们:即吃货们
  21. c1=Process(target=consumer,args=(q,))
  22. #开始
  23. p1.start()
  24. c1.start()
  25. print('主')

注意:结束信号 None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

主进程在生产者生产完毕后发送结束信号None

  1. from multiprocessing import Process,Queue
  2. import time,random,os
  3. def consumer(q):
  4. while True:
  5. res=q.get()
  6. if res is None:break #收到结束信号则结束
  7. time.sleep(random.randint(1,3))
  8. print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
  9. def producer(q):
  10. for i in range(2):
  11. time.sleep(random.randint(1,3))
  12. res='包子%s' %i
  13. q.put(res)
  14. print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
  15. if __name__ == '__main__':
  16. q=Queue()
  17. #生产者们:即厨师们
  18. p1=Process(target=producer,args=(q,))
  19. #消费者们:即吃货们
  20. c1=Process(target=consumer,args=(q,))
  21. #开始
  22. p1.start()
  23. c1.start()
  24. p1.join()
  25. q.put(None) #发送结束信号
  26. print('主')

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很 low 的方式去解决

多个消费者的例子:有几个消费者就需要发送几次结束信号

  1. from multiprocessing import Process,Queue
  2. import time,random,os
  3. def consumer(q):
  4. while True:
  5. res=q.get()
  6. if res is None:break #收到结束信号则结束
  7. time.sleep(random.randint(1,3))
  8. print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
  9. def producer(name,q):
  10. for i in range(2):
  11. time.sleep(random.randint(1,3))
  12. res='%s%s' %(name,i)
  13. q.put(res)
  14. print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
  15. if __name__ == '__main__':
  16. q=Queue()
  17. #生产者们:即厨师们
  18. p1=Process(target=producer,args=('包子',q))
  19. p2=Process(target=producer,args=('骨头',q))
  20. p3=Process(target=producer,args=('泔水',q))
  21. #消费者们:即吃货们
  22. c1=Process(target=consumer,args=(q,))
  23. c2=Process(target=consumer,args=(q,))
  24. #开始
  25. p1.start()
  26. p2.start()
  27. p3.start()
  28. c1.start()
  29. p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
  30. p2.join()
  31. p3.join()
  32. q.put(None) #有几个消费者就应该发送几次结束信号None
  33. q.put(None) #发送结束信号
  34. print('主')

明日默写:

抢票

  1. import json
  2. import time
  3. import random
  4. from multiprocessing import Process,Lock
  5. def check_ticket(i):
  6. with open('ticket') as f:
  7. ticket_count = json.load(f)
  8. print('person%s查询当前余票 :'%i,ticket_count['count'])
  9. def buy_ticket(i,lock):
  10. check_ticket(i)
  11. lock.acquire()
  12. with open('ticket') as f:
  13. ticket_count = json.load(f)
  14. time.sleep(random.random())
  15. if ticket_count['count'] > 0:
  16. print('person%s购票成功'%i)
  17. ticket_count['count'] -= 1
  18. else:
  19. print('余票不足,person%s购票失败'%i)
  20. time.sleep(random.random())
  21. with open('ticket','w') as f:
  22. json.dump(ticket_count,f)
  23. if __name__ == '__main__':
  24. lock = Lock()
  25. for i in range(10):
  26. Process(target=buy_ticket,args=[i,lock]).start()

红绿灯

  1. import time
  2. import random
  3. from multiprocessing import Process,Event
  4. def traffic_light(e,count): # 交通灯
  5. while True:
  6. while True:
  7. # 事件在创建的时候,e的状态是False,相当于程序中的红灯
  8. print('\033[1;31m红灯亮\033[0m')
  9. time.sleep(2) # 红灯亮2秒
  10. # 这里e.is_set()是False,所以not e.is_set()就是True
  11. if not e.is_set():e.set() # 判断为True时,变绿灯
  12. print('\033[1;32m绿灯亮\033[0m')
  13. time.sleep(2) # 绿灯亮2秒
  14. # 这里e.is_set()是True
  15. if e.is_set():e.clear() # 判断为True时,将状态设置为False
  16. count += 1 # 自增1
  17. if count == 5: # 判断为5时,跳出内层循环
  18. break
  19. break #跳出内层循环
  20. def car(i,e): # 汽车,感知状态的变化
  21. if not e.is_set(): # 当前这个事件状态是False
  22. print('car%s正在等待'%i) # 这辆车正在等待通过路口
  23. e.wait() # 阻塞,直到有一个e.set行为。正在等红灯
  24. print('car%s通过路口'%i) # 等待状态为True,才能通过
  25. if __name__ == '__main__':
  26. e = Event() # 创建事件,默认状态为False
  27. Process(target=traffic_light,args=(e,0)).start() # 启动红灯进程
  28. for i in range(1,6): # 模拟5个人
  29. time.sleep(random.randrange(0,5,2)) # 大于等于0且小于5之间的奇数
  30. Process(target=car,args=(i,e)).start() # 启动交通灯进程