author: UpDown published: True created: 2022年5月10日20点04分 tags: Done version: Outer
参考:https://blog.updown.world/articles/python/pythonbook4/index.html
并发,并行
- 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)
- 并行:指的是任务数小于等于cpu核数,即任务真的是一起执行的
- 真正的”并行”只能在多核CPU上实现,现实中由于任务数量远远多于CPU的核心数量,所以基本上都是“并发”。 操作系统会自动把很多任务轮流调度到每个核心上执行。、
多线程
threading
在Python中如果想使用线程实现多任务,可以使用thread模块 但是它比较底层,即意味着过程较为复杂不方便使用;推荐使用threading模块,它是对thread做了一些包装的,可以更加方便使用 ```python1导包
import threading import time
def buyfood(num): print(“学生%d买饭” % num) time.sleep(2)
if name == “main“: for i in range(5):
#2创建Thread对象 把任务添加进去 target的值是要执行的方法,args的值是方法的参数
t = threading.Thread(target=buyfood, args=(i,))
#3.开启线程执行任务
t.start() # 启动线程,即让线程开始执行
<a name="QGVcq"></a>
### 同时执行多个不同的任务
```python
import threading
from time import sleep, ctime
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print('---开始---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
#sleep(5) # 屏蔽此行代码,试试看,程序是否会立马结束?
print('---结束---:%s' % ctime())
运行顺序的不确定
当python程序中有多个任务需要被执行时,这些任务需要等待操作系统的调度(即操作系统安排接下来要执行哪个任务),因为每次运行程序时的环境(例如上次运行时 除了这个python程序之外还有QQ、微信在运行,而这次运行时没有QQ只有微信在运行都会影响操作系统的调度策略)不一样,所以多次运行同一个python程序时任务执行的先后顺序是不同的
threading.enumerate()
- 使用threading.enumerate()能够得到当前程序在运行时,所有的线程信息,以列表的方式返回
- 我们可以让主线程(程序运行后的默认线程),判断threading.enumerate()返回的线程数量,如果只有1个线程,那么就表示当前主线程自己,意味着没有其他的子线程(使用threading创建的那些线程),此时就可以结束主线程,只要主线程结束 那么这个程序也就结束了
传递命名参数
在使用Thread创建线程的时候,args是传递一个元组,元组中的数据个数与target指定的函数形参个数、顺序一直,kwargs是一个字典 里面的key当做形参中的变量名字,value是给这个形参传递的数值 ```java from threading import Thread import time
def work1(num1, num2, m): print(“——in work1—num1=%d,num2=%d,m=%d-“ % (num1, num2, m))
def work2(num1, num2, num3, n): print(“——in work1—num1=%d,num2=%d,num3=%d,n=%d-“ % (num1, num2, num3, n))
t1 = Thread(target=work1, args=(11, 22), kwargs={“m”: 100}) t1.start()
t2 = Thread(target=work2, args=(33, 44, 55), kwargs={“n”: 200}) t2.start()
“”” ——in work1—num1=11,num2=22,m=100- ——in work1—num1=33,num2=44,num3=55,n=200- “””
<a name="QZ0Mb"></a>
### 多线程的UDP聊天程序
```java
import socket
import threading
def send_msg(udp_socket):
"""获取键盘数据,并将其发送给对方"""
while True:
print("1: 发送数据")
print("2: 退出程序")
op = input("请输入操作序号:")
if op == "1":
# 1. 输入对方的ip地址
dest_ip = input("\n请输入对方的ip地址:")
# 2. 输入对方的port
dest_port = int(input("\n请输入对方的port:"))
while True:
# 3. 从键盘输入数据
msg = input("\n请输入要发送的数据:")
if msg:
# 4. 发送数据
udp_socket.sendto(msg.encode("utf-8"), (dest_ip, dest_port))
else:
# 要是没有输入内容则认为是要重新输入ip、port
break
elif op == "2":
# 结束发送
break
# 结束,关闭套接字
udp_socket.close()
def recv_msg(udp_socket):
"""接收数据并显示"""
while True:
try:
# 1. 接收数据
recv_msg = udp_socket.recvfrom(1024)
except:
break
else:
# 2. 解码
recv_ip = recv_msg[1]
recv_msg = recv_msg[0].decode("utf-8")
# 3. 显示接收到的数据
print("接收到来自于%s的数据:%s" % (str(recv_ip), recv_msg))
def main():
# 1. 创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 2. 绑定本地信息
udp_socket.bind(("", 7890))
# 3. 创建一个新的线程,用来接收数据
udp_r = threading.Thread(target=recv_msg, args=(udp_socket,))
# 4. 创建一个新的线程,用来发送数据
udp_s = threading.Thread(target=send_msg, args=(udp_socket,))
# 5. 运行创建的子线程
udp_r.start()
udp_s.start()
if __name__ == "__main__":
main()
- udp是全双工的,可以同时进行收发数据
- 创建了一个udp套接字可以传递给多个线程同时使用,一个收数据,一个发数据
如果在某个线程中将这个套接字close,那么意味着这个udp的套接字不能再使用了
类线程
封装性更好的一种创建线程的方式是:
定义一个新的类,继承Thread类
- 在这个类中实现run方法
- 在run方法中写如要执行的代码
当使用这个类创建一个对象后,调用对象的start方法就可以让这个线程执行,且会自动执行run方法的代码
python的threading.Thread类有一个run方法,用于定义线程的功能函数可以在自己的线程类中覆盖该方法。
- 创建线程对象后,调用start方法,可以启动该线程
- 当该线程获得执行的机会时,就会调用run方法执行线程的功能
- run方法中可以编写本线程需要执行的代码
- 每个线程默认有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
- 当线程的run()方法结束时表示该线程结束
- 既然run方法是个示例方法,那么也就意味着可以调用本类中的其它实例方法,从而能够编写出较为复杂的功能 ```java import threading import time
class MyThread(threading.Thread): def run(self): for i in range(3): msg = “I’m “+ self.name + ‘ @ ‘+str(i) # name属性中保存的是当前线程的名字 print(msg) time.sleep(1)
if name == ‘main‘: t = MyThread() t.start()
<a name="mbKFV"></a>
### 多线程并发TCP服务器
> 来源:[https://blog.updown.world/articles/python/pythonbook4/01day/07-%E6%A1%88%E4%BE%8B%EF%BC%9A%E5%B9%B6%E5%8F%91TCP%E6%9C%8D%E5%8A%A1%E5%99%A8.html](https://blog.updown.world/articles/python/pythonbook4/01day/07-%E6%A1%88%E4%BE%8B%EF%BC%9A%E5%B9%B6%E5%8F%91TCP%E6%9C%8D%E5%8A%A1%E5%99%A8.html)
1. 可以通过定义一个新的类,继承threading.Thread的方式创建线程
1. 创建这个线程对象的时候,可以像使用普通的类一样,给它的__init__方法创建参数
1. 在__init__方法中通过super().__init__()调用被覆盖的父类方法,能够保证父类需要进行的准备工作能够正常执行
1. 可以在__del__方法中调用close()关闭套接字
```java
import socket
import threading
class HandleData(threading.Thread):
"""用来接收和处理数据的线程"""
def __init__(self, client_socket):
super().__init__()
# 初始化时传入要处理的客户端对象
self.client_socket = client_socket
def run(self):
# 接收/发送数据
while True:
# 接收数据
recv_content = self.client_socket.recv(1024)
if len(recv_content) != 0:
print(recv_content)
# 把收到的数据返回
self.client_socket.send(recv_content)
else:
# 如果收到空数据 就结束
self.client_socket.close()
break
def __del__(self):
# 线程对象销毁时 结束socket
self.client_socket.close()
class TCPServer(threading.Thread):
"""tcp服务器的线程 ,接收不同客户端的连接"""
def __init__(self, port):
# 注意 这里必须调用父类的初始化方法,否则很多重要的初始化就不会执行
# threading.Thread.__init__(self)
super().__init__()
# 创建套接字
self.server_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定本地信息
self.server_s.bind(("", port))
# 将套接字由默认的主动链接模式改为被动模式(监听模块)
self.server_s.listen(128)
def run(self):
# 开启循环 可以一直接收不同的客户端
while True:
# 等待客户端进行链接
new_s, client_info = self.server_s.accept()
print('连接%s成功' % str(client_info))
# 创建一个处理客户端的线程对象,并且开启线程
HandleData(new_s).start()
def __del__(self):
# 关闭套接字
self.server_s.close()
def main():
# 创建服务器的线程对象
tcp_server = TCPServer(12000) # 12000表示TCP要绑定的端口
# 开启服务器线程
tcp_server.start()
if __name__ == '__main__':
main()
共享全局变量
- 如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确
python3.10开发组修改了源码,导致部分线程安全了 来源:https://stackoverflow.com/questions/69993959/python-threads-difference-for-3-10-and-others
假设两个线程t1和t2都要对全局变量g_num(默认是0)进行加1运算,t1和t2都各对g_num加10次,g_num的最终的结果应该为20。
但是由于是多线程同时操作,有可能出现下面情况:
- 在g_num=0时,t1取得g_num=0。此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也获得g_num=0
- 然后t2对得到的值进行加1并赋给g_num,使得g_num=1
- 然后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给g_num。
- 这样导致虽然t1和t2都对g_num加1,但结果仍然是g_num=1 ```python import threading import time
g_num = 0
def work1(num): global g_num for i in range(num): g_num += 1 print(“——in work1, g_num is %d—-“%g_num)
def work2(num): global g_num for i in range(num): g_num += 1 print(“——in work2, g_num is %d—-“%g_num)
print(“—-线程创建之前g_num is %d—-“%g_num)
t1 = threading.Thread(target=work1, args=(1000000,)) t1.start()
t2 = threading.Thread(target=work2, args=(1000000,)) t2.start()
while len(threading.enumerate()) != 1: time.sleep(1)
print(“2个线程对同一个全局变量操作之后的最终结果是:%s” % g_num)
<a name="g566z"></a>
### 互斥锁
> 参考:[https://blog.updown.world/articles/python/pythonbook4/01day/12-%E4%BA%92%E6%96%A5%E9%94%81.html](https://blog.updown.world/articles/python/pythonbook4/01day/12-%E4%BA%92%E6%96%A5%E9%94%81.html)
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制<br />线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁
<a name="WQkfg"></a>
#### 原理
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性
<a name="mncT2"></a>
#### Lock
```java
# 创建锁
mutex = threading.Lock()
# 锁定
mutex.acquire()
# 释放
mutex.release()
- 如果这个锁之前是没有上锁的,那么acquire不会阻塞(堵塞:理解为程序卡在这里等待某个条件满足)
- 如果在调用acquire对这个锁上锁之前 它已经被 其他线程上了锁,那么此时acquire会阻塞,直到这个锁被解锁为止 ```java import threading import time
g_num = 0
def test1(num): global g_num for i in range(num): mutex.acquire() # 上锁 g_num += 1 mutex.release() # 解锁
print("---test1---g_num=%d"%g_num)
def test2(num): global g_num for i in range(num): mutex.acquire() # 上锁 g_num += 1 mutex.release() # 解锁
print("---test2---g_num=%d"%g_num)
创建一个互斥锁
默认是未上锁的状态
mutex = threading.Lock()
创建2个线程,让他们各自对g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,)) p1.start()
p2 = threading.Thread(target=test2, args=(1000000,)) p2.start()
等待计算完成
while len(threading.enumerate()) != 1: time.sleep(1)
print(“2个线程对同一个全局变量操作之后的最终结果是:%s” % g_num)
<a name="AQEb3"></a>
#### 小总结
锁的好处:
- 确保了某段关键代码同时只能由一个线程从头到尾完整地执行
锁的坏处:
- 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
- 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
<a name="VkbaT"></a>
# Queue
<a name="nwVcm"></a>
### 队列Queue
1. 先进先出(FIFO)
1. 可以存放任意类型数据
```java
import queue
q = queue.Queue()
q.put('11') # 存入字符串
q.put(22) # 存入整数
q.put({'num': 100}) # 存入字典
print(q.get()) # 11
print(q.get()) # 22
print(q.get()) # {'num': 100}
"""
11
22
{'num': 100}
"""
堆栈Queue
- 后进先出(LIFO)
- 可以存放任意数据类型 ```java import queue q = queue.LifoQueue() q.put(‘11’) # 存入字符串 q.put(22) # 存入整数 q.put({‘num’: 100}) # 存入字典
print(q.get()) # {‘num’: 100} print(q.get()) # 22 print(q.get()) # 11
<a name="TAWYT"></a>
### 优先级Queue
1. 存放的数据是元组类型,第1个元素表示优先级,第2个元素表示存储的数据
1. 优先级数字越小优先级越高
1. 数据优先级高的优先被取出
1. 用于VIP用户数据优先被取出场景,因为上面两种都要挨个取出
```java
import queue
q = queue.PriorityQueue()
q.put((10, 'Q'))
q.put((30, 'Z'))
q.put((20, 'A'))
print(q.get()) # (10, 'Q')
print(q.get()) # (20, 'A')
print(q.get()) # (30, 'Z')
带有聊天记录的UDP聊天程序(FiFo)
import socket
import threading
import queue
class SendMsg(threading.Thread):
def __init__(self, udp_socket, queue):
super().__init__()
self.udp_socket = udp_socket
self. queue = queue
def run(self):
"""获取键盘数据,并将其发送给对方"""
while True:
print("1: 发送数据")
print("2: 退出程序")
op = input("请输入操作序号:")
if op == "1":
# 1. 输入对方的ip地址
dest_ip = input("\n请输入对方的ip地址:")
# 2. 输入对方的port
dest_port = int(input("\n请输入对方的port:"))
while True:
# 3. 从键盘输入数据
msg = input("\n请输入要发送的数据:")
if msg:
# 4. 发送数据
self.udp_socket.sendto(msg.encode("utf-8"), (dest_ip, dest_port))
info = "<<<(%s, %d):%s\n" % (dest_ip, dest_port, msg)
self.queue.put(info)
else:
# 要是没有输入内容则认为是要重新输入ip、port
break
elif op == "2":
break
def __del__(self):
self.udp_socket.close()
class RecvMsg(threading.Thread):
def __init__(self, udp_socket, queue):
super().__init__()
self.udp_socket = udp_socket
self.queue = queue
def run(self):
"""接收数据并显示"""
while True:
try:
# 1. 接收数据
recv_msg = self.udp_socket.recvfrom(1024)
except:
break
else:
# 2. 解码
recv_ip = recv_msg[1]
recv_msg = recv_msg[0].decode("utf-8")
# 3. 显示接收到的数据
info = ">>>%s:%s\n" % (str(recv_ip), recv_msg)
print(info)
self.queue.put(info)
def __del__(self):
self.udp_socket.close()
class SaveChat(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
with open("./chat.txt", "a") as f:
chat_info = self.queue.get()
print("正在将(%s)写入到聊天记录文件中" % chat_info)
f.write(chat_info)
def main():
# 创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定本地信息
udp_socket.bind(("", 7890))
# 创建一个FiFo的队列
q = queue.Queue()
# 创建线程对象
udp_r = RecvMsg(udp_socket, q)
udp_s = SendMsg(udp_socket, q)
chat_thread = SaveChat(q)
# 运行创建的子线程
udp_r.start()
udp_s.start()
chat_thread.start()
if __name__ == "__main__":
main()
进程
在python中使用使用进程实现多任务的方式有3种:
import multiprocessing # 进程的包
def dance(): while True: print(“跳舞”) time.sleep(1)
def song(): while True: print(“唱歌”) time.sleep(1)
def main(): “””一边唱歌一边跳舞”””
# threading.Thread(target=dance).start()
#
# threading.Thread(target=song).start()
# 创建一个进程
process_dance = multiprocessing.Process(target=dance)
# 开启
process_dance.start()
# 创建一个进程
process_song = multiprocessing.Process(target=song)
# 开启
process_song.start()
if name == ‘main‘: main()
<a name="BdBMS"></a>
### multiprocessing常用方法
- start():启动子进程实例(创建子进程)
- is_alive():判断子进程是否还在活着
- join([timeout]):是否等待子进程执行结束,或等待多少秒
- terminate():不管任务是否完成,立即终止子进程
<a name="XRipa"></a>
### PID
pid是唯一标识进程的号
- os.getpid()获取当前进程的pid
- os.getppid()获取父进程的pid
1. 每个进程都有1个数字来标记,这个数字称之为进程号
1. Linux系统中查看PID的命令是ps
```python
# 唱歌与跳舞
import time
import os
import multiprocessing # 进程的包
def dance():
print('dance进程:%d,父进程:%d' % (os.getpid(),os.getppid()))
while True:
print("跳舞")
time.sleep(1)
def song():
print('song进程:%d,父进程:%d' % (os.getpid(),os.getppid()))
while True:
print("唱歌")
time.sleep(1)
def main():
"""一边唱歌一边跳舞"""
print('主进程:%d' % os.getpid())
# 创建一个进程
process_dance = multiprocessing.Process(target=dance)
# 开启
process_dance.start()
# 创建一个进程
process_song = multiprocessing.Process(target=song)
# 开启
process_song.start()
if __name__ == '__main__':
main()
类进程
- 此种创建多进程的流程
- 自定义一个类,继承Process类
- 实现run方法
- 通过自定义的类,创建实例对象
- 调用实例对象的start方法
- 如果想快速的实现一个进程,功能较为简单的话,可以直接创建Process的实例对象
- 如果想实现一个功能较为完整、逻辑较为复杂的进程,可以自定义继承Process类 来实现 ```java from multiprocessing import Process import time
class MyNewProcess(Process): def run(self): while True: print(‘—-1—-‘) time.sleep(1)
if name==’main‘: p = MyNewProcess()
# 调用p.start()方法,p会先去父类中寻找start(),然后在Process的start方法中调用run方法
p.start()
while True:
print('---Main---')
time.sleep(1)
<a name="jnZSx"></a>
### 传递参数
1. 调用Process类创建进程对象时
1. target指明 创建进程后,进程指定的代码是哪个函数
1. args、kwargs用来给 那个函数指明传递的实参
1. args:元组
1. kwargs:字典
```java
import multiprocessing
import os
from time import sleep
def test(name, age, **kwargs):
for i in range(10):
print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
print(kwargs)
sleep(0.2)
if __name__ == '__main__':
p = multiprocessing.Process(target=test, args=('小明', 18), kwargs={"id": 1})
p.start()
sleep(1) # 1秒中之后,立即结束子进程
p.terminate()
p.join()
进程不共享全局变量
multiprocessing.Queue
- 使用Queue()时,若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)
- Queue的几个方法功能说明:
- Queue.qsize():返回当前队列包含的消息数量
- Queue.empty():如果队列为空,返回True,反之False
- Queue.full():如果队列满了,返回True,反之False
- Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True
- 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出Queue.Empty异常
- 如果block值为False,消息列队如果为空,则会立刻抛出Queue.Empty异常
- Queue.get_nowait():相当Queue.get(False)
- Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True
- 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出Queue.Full异常
- 如果block值为False,消息列队如果没有空间可写入,则会立刻抛出Queue.Full异常
- Queue.put_nowait(item):相当Queue.put(item, False)
- 通过使用Queue能够将多个互不相干的进程,能够共享数据,从而能够将一个较大的程序分成多个子功能来实现,每个子功能单独为一个进程,即使其中一个进程因为特殊情况挂掉了,也不会影响整个程序的运行 ```java
import multiprocessing import time
取
def get_data(queue): while True:
# 判断如果不空就获取数据
if not queue.empty():
data = queue.get()
print("取到的数据:", data)
time.sleep(1)
放
def put_data(queue): for temp in range(1000000):
# 判断如果不满 就添加数据
if not queue.full():
queue.put(temp)
print("放入的数据",temp)
time.sleep(1)
def main(): queue = multiprocessing.Queue(3) “””一个读一个取”””
# 这个把数据 放到队列中
multiprocessing.Process(target=put_data, args=(queue,)).start()
# 这个得到队列的数据
multiprocessing.Process(target=get_data, args=(queue,)).start()
if name == ‘main‘: main()
<a name="xeK6v"></a>
### multiprocessing.Pool(进程池)
> 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
> 初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务
- apply_async(func[, args[, kwds]]):使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
- close():关闭Pool,使其不再接受新的任务;
- terminate():不管任务是否完成,立即终止;
- join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
```java
from multiprocessing import Pool
import os
import random
import time
def worker(num):
for i in range(5):
print('===pid=%d==num=%d='%(os.getpid(),num))
time.sleep(1)
# 3表示进程池中最多有三个进程一起执行
pool=Pool(3)
for i in range(10):
print('---%d---'%i)
# 向进程中添加任务
# 注意:如果添加的任务数量超过了进程池中进程的个数的话,那么就不会接着往进程池中添加,
# 如果还没有执行的话,他会等待前面的进程结束,然后在往
# 进程池中添加新进程
pool.apply_async(worker,(i,))
pool.close() # 关闭进程池
pool.join() # 主进程在这里等待,只有子进程全部结束之后,在会开启主线程
multiprocessing.Manager().Queue()(进程池通信)
如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
# -*- coding:utf-8 -*-
# 修改import中的Queue为Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader从Queue获取到消息:%s" % q.get(True))
def writer(q):
print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in "itcast":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())