在Python中有很多模块可以创建进程,常用的有os.fork() 函数、multiprocessing模块和Pool进程池。
os.fork() 函数只适合在UNIX/Linux/Mac系统上运行,在Windows中是不可用的。
https://docs.python.org/3.9/library/multiprocessing.html

进程的创建
• 系统初始化
• 一个进程在运行过程中开启了子进程
• 用户的交互式请求,而创建一个新进程
• 一批处理作业的初始化

一、multiprocessing模块创建进程

1.1 介绍

multiprocessing模块提供了一个Process类来代表一个进程对象,由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
语法规则:
Process([group [, target [, name [, args [, kwargs]]]]])
group:参数未使用,值始终是None
target:表示当前进程启动时执行的可调用对象
name:为当前进程实例的别名
args:表示传递给target函数的参数元组
kwargs:表示传递给target函数的参数字典

1.2 方法介绍

p.start() 启动进程,并调用该子进程中的p.run()
p.run() 进程启动时运行的方法,去调用target指定的函数,自定义类的类中一定要实现该方法。
p.terminate() 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁。
p.is_alive() 判断进程实例是否还在运行,如果p仍然运行,返回True
p.join([timeout]) 主线程等待p终止(ps:主线程处于等的状态,而p处于运行的状态)
timeout时可选的超时时间。(ps:p.join只能join住start开启的进程,而不能join住run开启的进程。

1.3 属性介绍

p.daemon 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name 当前进程实例别名,默认为Process-N,N为从1开始递增的整数
p.pid 进程的pid
p.exidcode 进程在运行时为None、如果为-N,表示被信号N结束
p.authkey 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时,才能成功。

ps:
在windows操作系统中,由于没有fork(Linux操作系统中创建进程的机制),在创建子进程的时候会自动import启动它的文件,而在import子进程的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。必须把创建子进程的部分使用ifname==’main判断保护起来,import的时候,就不会递归运行了。

1.4 使用process模块创建进程

1.4.1 start() 方法

在一个python进程中 开启子进程,使用start() 方法

  1. from multiprocessing import Process ##导入multiprocessing模块
  2. ##执行子程序代码
  3. def test(name):
  4. print('我是子程序',name)
  5. ##执行主程序
  6. def main():
  7. print("主程序开始")
  8. p = Process(target = test,args =('lhuan',)) ##实例化Process类
  9. p.start() ##启动子进程
  10. print("主进程结束")
  11. if __name__ == "__main__":
  12. main()
  13. #结果
  14. 主程序开始
  15. 主进程结束
  16. 我是子程序 lhuan

1.4.2 join() 方法

  1. ##join() 方法 创建子进程
  2. ##p.join([timeout]) 主线程等待p终止(ps:主线程处于等的状态,而p处于运行的状态)
  3. # timeout时可选的超时时间。(ps:p.join只能join住start开启的进程,而不能join住run开启的进程。
  4. import time
  5. from multiprocessing import Process
  6. def f(name):
  7. print('hello',name)
  8. time.sleep(1)
  9. print('子进程')
  10. if __name__=='__main__':
  11. p=Process(target=f,args=('lhuan',))
  12. # p.start()
  13. p.run()
  14. p.join() #如果改为p.run()则后面的不会执行
  15. print('主程序')
  16. ##结果
  17. hello lhuan
  18. 子进程
  19. Traceback (most recent call last):
  20. File "e:/pythonstduy/process/1.1.py", line 31, in <module>
  21. p.join() #如果改为p.run()则后面的不会执行
  22. File "C:\Users\lh\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 139, in join
  23. assert self._popen is not None, 'can only join a started process'
  24. AssertionError: can only join a started process
  25. import time
  26. from multiprocessing import Process
  27. def f(name):
  28. print('hello',name)
  29. time.sleep(1)
  30. print('子进程')
  31. if __name__=='__main__':
  32. p=Process(target=f,args=('lhuan',))
  33. p.start()
  34. # p.run()
  35. p.join() #如果改为p.run()则后面的不会执行
  36. print('主程序')
  37. ##结果
  38. hello lhuan
  39. 子进程
  40. 主程序

1.4.3 两个进程

  1. ##创建两个子进程,并记录子进程的运行时间
  2. from multiprocessing import Process
  3. import time
  4. import os
  5. ##两个子进程会调用的方法
  6. def child_1(interval):
  7. print("子进程(%s)开始执行,父进程为:(%s)"%(os.getpid(),os.getppid()))
  8. t_start = time.time() ##计时开始
  9. time.sleep(interval) ##程序将会被挂起interval秒
  10. t_end = time.time() ##计时结束
  11. print("子进程(%s)执行时间为'%0.2f'秒"%(os.getpid(),t_end-t_start))
  12. def child_2(interval):
  13. print("子进程(%s)开始执行,父进程为:(%s)"%(os.getpid(),os.getppid()))
  14. t_start = time.time() ##计时开始
  15. time.sleep(interval) ##程序将会被挂起interval秒
  16. t_end = time.time() ##计时结束
  17. print("子进程(%s)执行时间为'%0.2f'秒"%(os.getpid(),t_end-t_start))
  18. if __name__ == "__main__":
  19. print("----------------父程序执行----------------")
  20. print("父程序PID :%s "%os.getpid()) ##输出当前程序的ID
  21. P1 = Process(target=child_1,args=(1,)) ##实例化进程p1
  22. P2 = Process(target=child_2,args=(1,)) ##实例化进程p2
  23. P1.start() ##启动进程p1
  24. P2.start() ##启动进程p2
  25. ##同时父进程仍然往下执行,如果p2进程还在执行,将会返回True
  26. print("P1.is_alive=%s"%P1.is_alive())
  27. print("P2.is_alive=%s"%P2.is_alive())
  28. ##输出P1和P2进程的别名和PID
  29. print("P1.NAME: %s"%P1.name)
  30. print("P1.pid: %s"%P1.pid)
  31. print("P2.NAME: %s"%P2.name)
  32. print("P2.pid: %s"%P2.pid)
  33. print("----------------等待子进程----------------")
  34. P1.join() ##等待P1进程结束
  35. P2.join() ##等待P2进程结束
  36. print("----------------父进程执行结束----------------")
  37. ##结果
  38. ----------------父程序执行----------------
  39. 父程序PID :7488
  40. P1.is_alive=True
  41. P2.is_alive=True
  42. P1.NAME: Process-1
  43. P1.pid: 5584
  44. P2.NAME: Process-2
  45. P2.pid: 7396
  46. ----------------等待子进程----------------
  47. 子进程(5584)开始执行,父进程为:(7488)
  48. 子进程(7396)开始执行,父进程为:(7488)
  49. 子进程(5584)执行时间为'1.00'
  50. 子进程(7396)执行时间为'1.00'
  51. ----------------父进程执行结束----------------

1.4.4 多个进程

(ps:子进程的顺序不是根据启动顺序决定的)

  1. ##多个进程
  2. import time,os
  3. from multiprocessing import Process
  4. def f(name):
  5. print('hello',name)
  6. print("子进程的pid=%s"%os.getpid())
  7. time.sleep(1)
  8. if __name__=='__main__':
  9. print("----------------父程序执行----------------")
  10. print("父程序PID :%s "%os.getpid()) ##输出当前程序的ID
  11. p_lst=[]
  12. for i in range(5):
  13. p=Process(target=f,args=('lhuan',))
  14. p.start()
  15. p_lst.append(p)
  16. print(p_lst)
  17. print('----------------父程序结束----------------')
  18. #结果
  19. ----------------父程序执行----------------
  20. 父程序PID :8312
  21. [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>]
  22. ----------------父程序结束----------------
  23. hello lhuan
  24. hello lhuan
  25. hello lhuan
  26. 子进程的pid=10636
  27. 子进程的pid=7952
  28. 子进程的pid=11536
  29. hello lhuan
  30. 子进程的pid=15144
  31. hello lhuan
  32. 子进程的pid=648
  33. ##多进程同时运行使用join方法
  34. import time,os
  35. from multiprocessing import Process
  36. def f(name):
  37. print('hello',name)
  38. print("子进程的pid=%s"%os.getpid())
  39. time.sleep(1)
  40. if __name__=='__main__':
  41. print("----------------父程序执行----------------")
  42. print("父程序PID :%s "%os.getpid()) ##输出当前程序的ID
  43. p_lst=[]
  44. for i in range(5):
  45. p=Process(target=f,args=('lhuan',))
  46. p.start()
  47. p_lst.append(p)
  48. p.join() ## [p.join() for p in p_lst]作用相同
  49. print(p_lst)
  50. print('----------------父程序结束----------------')
  51. ##结果
  52. ----------------父程序执行----------------
  53. 父程序PID :7872
  54. hello lhuan
  55. 子进程的pid=6660
  56. hello lhuan
  57. 子进程的pid=15296
  58. hello lhuan
  59. 子进程的pid=14956
  60. hello lhuan
  61. 子进程的pid=6976
  62. hello lhuan
  63. 子进程的pid=10276
  64. [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
  65. ----------------父程序结束----------------

二、以继承Process类的形式开启进程

2.1 创建进程

对于简单的任务,可以使用Process(target=test)方式实现多线程。如果要处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类的时候,等同于实例化一个一个进程对象。

  1. ##使用Process子类方式创建两个子进程,分别输出父子的进程ID,以及每个子进程的状态和运行时间
  2. from multiprocessing import Process
  3. import os,time
  4. ##继承Process类
  5. class SubProcess(Process):
  6. ##Process本身也具有__init__() 初始化方法,这个子类相当于重写了父类的这个方法
  7. def __init__(self,interval,name=''):
  8. Process.__init__(self) ##调用Process父类的方法
  9. self.interval = interval ##接收参数interval
  10. if name: ##判断name参数是否存在
  11. self.name = name ##如果传递参数name,则为子进程创建name属性,否则使用默认属性
  12. ##重写Process类的run() 方法
  13. def run(self):
  14. print("子进程(%s)开始执行,父进程为(%s)"%(os.getpid(),os.getppid()))
  15. t_start = time.time()
  16. time.sleep(self.interval)
  17. t_stop = time.time()
  18. print("子进程(%s)执行结束,用时(%0.2f)秒"%(os.getpid(),t_stop-t_start))
  19. if __name__ == "__main__":
  20. print("--------------------父进程开始执行--------------------")
  21. print("父进程的PID%s"%(os.getpid())) ##输出当前程序的PID
  22. p1 = SubProcess(interval = 1,name = "mrosoft")
  23. p2 = SubProcess(interval = 2)
  24. ##对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法
  25. p1.start() ##启动进程p1
  26. p2.start() ##启动进程p2
  27. ##输出p1和p2进程的执行状态,如果真正进行,返回True,否则返回False
  28. print("p1.is_alive: %s"%p1.is_alive())
  29. print("p2.is_alive: %s"%p2.is_alive())
  30. ##输出p1和p2进程的别名和PID
  31. print("p1.name: %s"%p1.name)
  32. print("p1.PID: %s"%p1.pid)
  33. print("p2.name: %s"%p2.name)
  34. print("p2.PID: %s"%p2.pid)
  35. print("--------------------等待子进程--------------------")
  36. p1.join() ##等待进程p1结束
  37. p2.join() ##等待进程p2结束
  38. print('--------------------父进程执行结束--------------------')
  39. ##结果
  40. --------------------父进程开始执行--------------------
  41. 父进程的PID14596
  42. p1.is_alive: True
  43. p2.is_alive: True
  44. p1.name: mrosoft
  45. p1.PID: 14308
  46. p2.name: SubProcess-2
  47. p2.PID: 12312
  48. --------------------等待子进程--------------------
  49. 子进程(14308)开始执行,父进程为(14596)
  50. 子进程(12312)开始执行,父进程为(14596)
  51. 子进程(14308)执行结束,用时(1.00)秒
  52. 子进程(12312)执行结束,用时(2.01)秒
  53. --------------------父进程执行结束--------------------
  54. import os
  55. from multiprocessing import Process
  56. class MyProcess(Process):
  57. def __init__(self,name):
  58. super().__init__()
  59. self.name=name
  60. def run(self):
  61. print(os.getpid()) #这句话有点问题
  62. print('%s 正在学习'%self.name)
  63. if __name__=='__main__':
  64. print("--------------------父进程开始执行--------------------")
  65. p1=MyProcess('LH')
  66. p2=MyProcess('lhuan')
  67. p3=MyProcess('HUAN')
  68. p1.start()
  69. p2.start()
  70. p3.start()
  71. print("--------------------等待子进程--------------------")
  72. p1.join()
  73. p2.join()
  74. p3.join()
  75. print('--------------------父进程执行结束--------------------')
  76. ##结果
  77. --------------------父进程开始执行--------------------
  78. --------------------等待子进程--------------------
  79. 6196
  80. lhuan 正在学习
  81. 5108
  82. HUAN 正在学习
  83. 7824
  84. LH 正在学习
  85. --------------------父进程执行结束--------------------

2.2 守护进程

会随着主进程的结束而结束
主进程创建守护进程

  1. 守护进程会在主进程代码执行结束后就终止
  2. 守护进程内无法再开启子进程,否则抛出异常

ps:进程之间是互相独立的,主进程代码运行结束,守护进程随机停止

  1. import os
  2. import time
  3. from multiprocessing import Process
  4. class Myprocess(Process):
  5. def __init__(self,person):
  6. super().__init__()
  7. self.person=person
  8. def run(self):
  9. print(self.person)
  10. print("子进程的pid=%s"%os.getpid())
  11. if __name__=='__main__':
  12. print("----------------父程序执行----------------")
  13. print("父程序PID :%s "%os.getpid()) ##输出当前程序的ID
  14. p=Myprocess('lhuan')
  15. p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
  16. p.start()
  17. print("父程序PID :%s "%os.getpid()) ##输出当前程序的ID
  18. time.sleep(10) #在sleep时查看进程id对应的进程
  19. print('----------------父程序结束----------------')
  20. ##结果
  21. ----------------父程序执行----------------
  22. 父程序PID :2612
  23. 父程序PID :2612
  24. lhuan
  25. 子进程的pid=7364
  26. ----------------父程序结束----------------
  27. 如果p.daemon=Truep.start()之后,就会报下面的错
  28. assert self._popen is None, 'process has already started'
  29. AssertionError: process has already started

三、使用进程池pool创建进程

3.1 介绍

使用multiprocessing模块中提供的Pool类,即进程池来创建几十个或者上百个进程。
进程池对象支持带有超时和回调的异步结果,并具有映射实现。
语法规则:class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :要使用的工作进程数,如果processes是None,返回os.cpu_count()的使用数
initializer:如果没有初始化,每个工作进程将在启动时调用
maxtasksperchild:工作进程退出之间完成的任务数,可以用新的工作进程替换该任务,来达到释放未使用资源的目的。
context:用于只当用于启动工作进程的上下文。

3.2 Pool类的方法

方法 说明
apply(func[, args[, kwds]]) 使用阻塞方式调用func,会阻塞直到结果准备就绪
apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] 使用非阻塞方式调用func函数(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程)
args为传递给func的参数列表
kwds为传递给func的关键字列表
map(func,iterable [,chunksize ] ) 会阻塞直到结果准备就绪
将可迭代项分为多个块,将其作为单独的任务提交给流程池,这些块的大小可以通过将chunksize设置为正整数来指定
map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] ) 使用非阻塞方式的map()
如果指定了callback,那么它应该是一个可以接受单个参数的callable。结果准备就绪时,将对其应用回调,除非调用失败,否则将 应用error_callback。
如果指定了error_callback,那么它应该是可调用的,可以接受单个参数。如果目标函数失败,则使用异常实例调用error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞
close() 关闭Pool,使其不再接收新的任务
terminate() 不管任务是否完成,立即退出
当Pool对象被回收的时候,垃圾terminate() 将立即被调用
join() 主进程阻塞,等待子进程的退出。
ps:必须在close或terminate() 之后使用
ready() 返回呼叫是否完成

阻塞和非阻塞的区别:简单来说,就是可以更好的利用资源,不需要等上一个进程退出就可以执行下一个进程,可以并行执行多个进程。

  1. ##线程池
  2. from multiprocessing import Pool
  3. import os,time
  4. def task(name):
  5. print('子进程(%s) 执行task% s……'%(os.getpid(),name))
  6. time.sleep(1)
  7. if __name__ == '__main__':
  8. print('父进程 (%s):'%os.getpid())
  9. p = Pool(3) ##定义一个线程池,最大线程数为3
  10. for i in range(10): ##从0开始循环10次
  11. p.apply_async(task,args=(i,)) ##使用非阻塞方式调用task函数
  12. print('等待所有子进程结束……')
  13. p.close() ##关闭线程池,关闭后p不再接收新的请求
  14. p.join() ##等待子进程结束
  15. print('所有进程结束')
  16. ##结果
  17. 父进程 (14716):
  18. 等待所有子进程结束……
  19. 子进程(1604) 执行task0……
  20. 子进程(15780) 执行task1……
  21. 子进程(3064) 执行task2……
  22. 子进程(1604) 执行task3……
  23. 子进程(15780) 执行task4……
  24. 子进程(3064) 执行task5……
  25. 子进程(1604) 执行task6……
  26. 子进程(15780) 执行task7……
  27. 子进程(3064) 执行task8……
  28. 子进程(1604) 执行task9……
  29. 所有进程结束

四、进程间通信

4.1 进程间的数据隔离

在一个进程中的结果,没有传递到下一个进程中,即进程之间的数据是隔离的,没有共享信息。

  1. ##进程间的数据隔离
  2. from multiprocessing import Process
  3. import time,os
  4. def plus():
  5. print('------------------子进程1开始------------------')
  6. global g_num
  7. g_num +=50
  8. print("子进程1中g_num的值:%d"%g_num)
  9. print('------------------子进程1结束------------------')
  10. def minus():
  11. print('------------------子进程2开始------------------')
  12. global g_num
  13. g_num -=50
  14. print("子进程2中g_num的值:%d"%g_num)
  15. print('------------------子进程2结束------------------')
  16. g_num = 100 ##定义一个全局变量
  17. if __name__ == '__main__':
  18. print('------------------主进程开始------------------')
  19. print("主进程中g_num的值:%d"%g_num)
  20. p1 = Process(target = plus) ##实例化进程p1
  21. p2 = Process(target = minus) ##实例化进程p2
  22. p1.start() ##开始进程p1
  23. p2.start() ##开始进程p2
  24. p1.join() ##等待进程p1结束
  25. p2.join() ##等待进程p2结束
  26. print("主进程中g_num的值:%d"%g_num)
  27. print('------------------主进程结束------------------')
  28. #结果
  29. ------------------主进程开始------------------
  30. 主进程中g_num的值:100
  31. ------------------子进程1开始------------------
  32. 子进程1g_num的值:150
  33. ------------------子进程1结束------------------
  34. ------------------子进程2开始------------------
  35. 子进程2g_num的值:50
  36. ------------------子进程2结束------------------
  37. 主进程中g_num的值:100
  38. ------------------主进程结束------------------

Python的multiprocessing模块封装了底层的机制,提供了包括Queue(队列)、Pipes(管道)等多种方式来交换数据。

4.2 管道

Pipe()(用于两个进程之间的连接)
语法规则:
multiprocessing.Pipe([duplex])
返回一对代表通信的配管的端部的对象,(conn1, conn2)
如果双工是True(默认值),那么管道是双向的。如果双工是False,则管道是单向的:conn1只能用于接收消息,conn2并且只能用于发送消息。

4.3 队列

队列(Queue)就是模仿现实生活中的排队,先进先出,后来的在队尾。
Queue本身是一个消息队列程序,队列(允许多个生产者和使用者)
语法规则:
class multiprocessing.Queue([maxsize])
括号中表示指定的最大可接收的消息数量,如果没有指定或者数量为负值则表示可接收的消息数量没有上限(直到内存的尽头)。
返回使用管道和一些锁/信号量实现的进程共享队列。当流程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。

4.3.1 常用方法

常用的方法如下

方法 说明
qsize() 返回当前队列包含的消息数量
ps:由于多线程/多处理语义,此数字不可靠。
empty() 如果队列为空,返回True,如果队列不为空,则返回False
ps:由于多线程/多处理语义,此数字不可靠。
full() 如果队列已满,返回True,如果队列不满,则返回False
ps:由于多线程/多处理语义,此数字不可靠。
put(obj [,block [,timeout ] ] ) 将obj放入队列,block默认为Ture,并且timeout是None(默认值)
如果block使用默认值,且没有设置timeout(单位:秒),消息队列如果已经没有空间写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置timeout秒,如果还没有空间,则抛出queue.Full异常
如果block值为False,消息队列没有空间写入,则会立刻抛出queue.Full异常
get([ block [,timeout ] ] ) 获取消息队列中的一条消息,然后将其从队列中删除,block默认值为True
如果block使用默认值,且没有设置timeout(单位:秒),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到消息队列读到消息为至,如果设置timeout秒,则会等待timeout秒,如果还没有读到任何消息,则会抛出 queue.Empty异常
如果block值为False,消息队列为空,则会立刻抛出queue.Empty异常
put_nowait(obj ) 相当于put(obj, False)
get_nowait() 相当于get(False)
close() 当前进程不会再将更多数据放入此队列,后台线程将所有缓冲的数据刷新到管道后将退出。
队列被垃圾回收时,将自动调用此方法。
join_thread() 加入后台线程。仅close()在调用后才能使用。它阻塞直到后台线程退出,以确保缓冲区中的所有数据都已刷新到管道中。
cancel_join_thread() 防止join_thread()阻塞

4.3.2 使用

  1. ##使用process.Queue
  2. from multiprocessing import Queue
  3. if __name__ == '__main__':
  4. q = Queue(3) ##初始化一个Queue对象,最多可接收3条put消息
  5. q.put('消息 1')
  6. print('消息队列是否满了:%s'%q.full())
  7. q.put('消息 2')
  8. print('消息队列是否满了:%s'%q.full())
  9. q.put('消息 3')
  10. print('消息队列是否满了:%s'%q.full())
  11. ##因为消息队列已满,下面的try都会发生异常
  12. try:
  13. q.put('消息4',True,2)
  14. except:
  15. print('消息队列已满,现有消息数量为%d:'%q.qsize())
  16. try:
  17. q.put_nowait('消息4')
  18. except:
  19. print('消息队列已满,现有消息数量为%d:'%q.qsize())
  20. ##读取消息
  21. ##判断消息队列是否为空,然后再读取
  22. if not q.empty():
  23. print('----------------从消息队列中获取消息----------------')
  24. print('消息队列的数量: %d'%q.qsize())
  25. for i in range(q.qsize()):
  26. print(q.get_nowait())
  27. print('消息队列的数量: %d'%q.qsize())
  28. ##写入消息
  29. ##判断消息队列是否满了,然后再写入
  30. if not q.full():
  31. print('----------------向消息队列中写入信息----------------')
  32. q.put_nowait('消息4')
  33. print('消息队列的数量: %d'%q.qsize())
  34. ##结果
  35. 消息队列是否满了:False
  36. 消息队列是否满了:False
  37. 消息队列是否满了:True
  38. 消息队列已满,现有消息数量为3
  39. 消息队列已满,现有消息数量为3
  40. ----------------从消息队列中获取消息----------------
  41. 消息队列的数量: 3
  42. 消息 1
  43. 消息 2
  44. 消息 3
  45. 消息队列的数量: 0
  46. ----------------向消息队列中写入信息----------------
  47. 消息队列的数量: 1

结合Process和Queue实现进程间的通信

  1. ##创建两个子进程,一个子进程负责向队列中写入数据,另一个进程负责从队列中读取数据。
  2. from multiprocessing import Queue,Process
  3. import time
  4. ##向队列中写入数据
  5. def write_task(q):
  6. if not q.full():
  7. for i in range(5):
  8. message = "消息" + str(i)
  9. q.put(message)
  10. print('写入: %s'%message)
  11. ##从队列中读取数据
  12. def read_task(q):
  13. time.sleep(1)
  14. if not q.empty():
  15. for i in range(q.qsize()):
  16. print('读取: %s'%q.get(True,2)) ##等待两秒,如果还没读到任何消息,就抛出queue.Empty异常
  17. if __name__ == "__main__":
  18. print('--------------父进程开始------------------')
  19. q = Queue()
  20. pw = Process(target = write_task,args = (q,)) ##实例化写入队列,并且传递队列
  21. pr = Process(target = read_task,args = (q,)) ##实例化读取队列,并且传递队列
  22. pw.start() ##启动子进程pw,写入
  23. pr.start() ##启动子进程pr,读取
  24. pw.join() ##等待子进程pw结束
  25. pr.join() ##等待子进程pr结束
  26. print('--------------父进程结束------------------')
  27. ##结果
  28. --------------父进程开始------------------
  29. 写入: 消息0
  30. 写入: 消息1
  31. 写入: 消息2
  32. 写入: 消息3
  33. 写入: 消息4
  34. 读取: 消息0
  35. 读取: 消息1
  36. 读取: 消息2
  37. 读取: 消息3
  38. 读取: 消息4
  39. --------------父进程结束------------------