在Python中有很多模块可以创建进程,常用的有os.fork() 函数、multiprocessing模块和Pool进程池。
os.fork() 函数只适合在UNIX/Linux/Mac系统上运行,在Windows中是不可用的。
https://docs.python.org/3.9/library/multiprocessing.html
进程的创建
• 系统初始化
• 一个进程在运行过程中开启了子进程
• 用户的交互式请求,而创建一个新进程
• 一批处理作业的初始化
一、multiprocessing模块创建进程
1.1 介绍
multiprocessing模块提供了一个Process类来代表一个进程对象,由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
语法规则:Process([group [, target [, name [, args [, kwargs]]]]])
group:参数未使用,值始终是None
target:表示当前进程启动时执行的可调用对象
name:为当前进程实例的别名
args:表示传递给target函数的参数元组
kwargs:表示传递给target函数的参数字典
1.2 方法介绍
| p.start() | 启动进程,并调用该子进程中的p.run() |
|---|---|
| p.run() | 进程启动时运行的方法,去调用target指定的函数,自定义类的类中一定要实现该方法。 |
| p.terminate() | 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁。 |
| p.is_alive() | 判断进程实例是否还在运行,如果p仍然运行,返回True |
| p.join([timeout]) | 主线程等待p终止(ps:主线程处于等的状态,而p处于运行的状态) timeout时可选的超时时间。(ps:p.join只能join住start开启的进程,而不能join住run开启的进程。 |
1.3 属性介绍
| p.daemon | 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 |
|---|---|
| p.name | 当前进程实例别名,默认为Process-N,N为从1开始递增的整数 |
| p.pid | 进程的pid |
| p.exidcode | 进程在运行时为None、如果为-N,表示被信号N结束 |
| p.authkey | 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时,才能成功。 |
ps:
在windows操作系统中,由于没有fork(Linux操作系统中创建进程的机制),在创建子进程的时候会自动import启动它的文件,而在import子进程的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。必须把创建子进程的部分使用ifname==’main‘判断保护起来,import的时候,就不会递归运行了。
1.4 使用process模块创建进程
1.4.1 start() 方法
在一个python进程中 开启子进程,使用start() 方法
from multiprocessing import Process ##导入multiprocessing模块##执行子程序代码def test(name):print('我是子程序',name)##执行主程序def main():print("主程序开始")p = Process(target = test,args =('lhuan',)) ##实例化Process类p.start() ##启动子进程print("主进程结束")if __name__ == "__main__":main()#结果主程序开始主进程结束我是子程序 lhuan
1.4.2 join() 方法
##join() 方法 创建子进程##p.join([timeout]) 主线程等待p终止(ps:主线程处于等的状态,而p处于运行的状态)# timeout时可选的超时时间。(ps:p.join只能join住start开启的进程,而不能join住run开启的进程。import timefrom multiprocessing import Processdef f(name):print('hello',name)time.sleep(1)print('子进程')if __name__=='__main__':p=Process(target=f,args=('lhuan',))# p.start()p.run()p.join() #如果改为p.run()则后面的不会执行print('主程序')##结果hello lhuan子进程Traceback (most recent call last):File "e:/pythonstduy/process/1.1.py", line 31, in <module>p.join() #如果改为p.run()则后面的不会执行File "C:\Users\lh\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 139, in joinassert self._popen is not None, 'can only join a started process'AssertionError: can only join a started processimport timefrom multiprocessing import Processdef f(name):print('hello',name)time.sleep(1)print('子进程')if __name__=='__main__':p=Process(target=f,args=('lhuan',))p.start()# p.run()p.join() #如果改为p.run()则后面的不会执行print('主程序')##结果hello lhuan子进程主程序
1.4.3 两个进程
##创建两个子进程,并记录子进程的运行时间from multiprocessing import Processimport timeimport os##两个子进程会调用的方法def child_1(interval):print("子进程(%s)开始执行,父进程为:(%s)"%(os.getpid(),os.getppid()))t_start = time.time() ##计时开始time.sleep(interval) ##程序将会被挂起interval秒t_end = time.time() ##计时结束print("子进程(%s)执行时间为'%0.2f'秒"%(os.getpid(),t_end-t_start))def child_2(interval):print("子进程(%s)开始执行,父进程为:(%s)"%(os.getpid(),os.getppid()))t_start = time.time() ##计时开始time.sleep(interval) ##程序将会被挂起interval秒t_end = time.time() ##计时结束print("子进程(%s)执行时间为'%0.2f'秒"%(os.getpid(),t_end-t_start))if __name__ == "__main__":print("----------------父程序执行----------------")print("父程序PID :%s "%os.getpid()) ##输出当前程序的IDP1 = Process(target=child_1,args=(1,)) ##实例化进程p1P2 = Process(target=child_2,args=(1,)) ##实例化进程p2P1.start() ##启动进程p1P2.start() ##启动进程p2##同时父进程仍然往下执行,如果p2进程还在执行,将会返回Trueprint("P1.is_alive=%s"%P1.is_alive())print("P2.is_alive=%s"%P2.is_alive())##输出P1和P2进程的别名和PIDprint("P1.NAME: %s"%P1.name)print("P1.pid: %s"%P1.pid)print("P2.NAME: %s"%P2.name)print("P2.pid: %s"%P2.pid)print("----------------等待子进程----------------")P1.join() ##等待P1进程结束P2.join() ##等待P2进程结束print("----------------父进程执行结束----------------")##结果----------------父程序执行----------------父程序PID :7488P1.is_alive=TrueP2.is_alive=TrueP1.NAME: Process-1P1.pid: 5584P2.NAME: Process-2P2.pid: 7396----------------等待子进程----------------子进程(5584)开始执行,父进程为:(7488)子进程(7396)开始执行,父进程为:(7488)子进程(5584)执行时间为'1.00'秒子进程(7396)执行时间为'1.00'秒----------------父进程执行结束----------------
1.4.4 多个进程
(ps:子进程的顺序不是根据启动顺序决定的)
##多个进程import time,osfrom multiprocessing import Processdef f(name):print('hello',name)print("子进程的pid=%s"%os.getpid())time.sleep(1)if __name__=='__main__':print("----------------父程序执行----------------")print("父程序PID :%s "%os.getpid()) ##输出当前程序的IDp_lst=[]for i in range(5):p=Process(target=f,args=('lhuan',))p.start()p_lst.append(p)print(p_lst)print('----------------父程序结束----------------')#结果----------------父程序执行----------------父程序PID :8312[<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>]----------------父程序结束----------------hello lhuanhello lhuanhello lhuan子进程的pid=10636子进程的pid=7952子进程的pid=11536hello lhuan子进程的pid=15144hello lhuan子进程的pid=648##多进程同时运行使用join方法import time,osfrom multiprocessing import Processdef f(name):print('hello',name)print("子进程的pid=%s"%os.getpid())time.sleep(1)if __name__=='__main__':print("----------------父程序执行----------------")print("父程序PID :%s "%os.getpid()) ##输出当前程序的IDp_lst=[]for i in range(5):p=Process(target=f,args=('lhuan',))p.start()p_lst.append(p)p.join() ## [p.join() for p in p_lst]作用相同print(p_lst)print('----------------父程序结束----------------')##结果----------------父程序执行----------------父程序PID :7872hello lhuan子进程的pid=6660hello lhuan子进程的pid=15296hello lhuan子进程的pid=14956hello lhuan子进程的pid=6976hello lhuan子进程的pid=10276[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]----------------父程序结束----------------
二、以继承Process类的形式开启进程
2.1 创建进程
对于简单的任务,可以使用Process(target=test)方式实现多线程。如果要处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类的时候,等同于实例化一个一个进程对象。
##使用Process子类方式创建两个子进程,分别输出父子的进程ID,以及每个子进程的状态和运行时间from multiprocessing import Processimport os,time##继承Process类class SubProcess(Process):##Process本身也具有__init__() 初始化方法,这个子类相当于重写了父类的这个方法def __init__(self,interval,name=''):Process.__init__(self) ##调用Process父类的方法self.interval = interval ##接收参数intervalif name: ##判断name参数是否存在self.name = name ##如果传递参数name,则为子进程创建name属性,否则使用默认属性##重写Process类的run() 方法def run(self):print("子进程(%s)开始执行,父进程为(%s)"%(os.getpid(),os.getppid()))t_start = time.time()time.sleep(self.interval)t_stop = time.time()print("子进程(%s)执行结束,用时(%0.2f)秒"%(os.getpid(),t_stop-t_start))if __name__ == "__main__":print("--------------------父进程开始执行--------------------")print("父进程的PID%s"%(os.getpid())) ##输出当前程序的PIDp1 = SubProcess(interval = 1,name = "mrosoft")p2 = SubProcess(interval = 2)##对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法p1.start() ##启动进程p1p2.start() ##启动进程p2##输出p1和p2进程的执行状态,如果真正进行,返回True,否则返回Falseprint("p1.is_alive: %s"%p1.is_alive())print("p2.is_alive: %s"%p2.is_alive())##输出p1和p2进程的别名和PIDprint("p1.name: %s"%p1.name)print("p1.PID: %s"%p1.pid)print("p2.name: %s"%p2.name)print("p2.PID: %s"%p2.pid)print("--------------------等待子进程--------------------")p1.join() ##等待进程p1结束p2.join() ##等待进程p2结束print('--------------------父进程执行结束--------------------')##结果--------------------父进程开始执行--------------------父进程的PID14596p1.is_alive: Truep2.is_alive: Truep1.name: mrosoftp1.PID: 14308p2.name: SubProcess-2p2.PID: 12312--------------------等待子进程--------------------子进程(14308)开始执行,父进程为(14596)子进程(12312)开始执行,父进程为(14596)子进程(14308)执行结束,用时(1.00)秒子进程(12312)执行结束,用时(2.01)秒--------------------父进程执行结束--------------------import osfrom multiprocessing import Processclass MyProcess(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print(os.getpid()) #这句话有点问题print('%s 正在学习'%self.name)if __name__=='__main__':print("--------------------父进程开始执行--------------------")p1=MyProcess('LH')p2=MyProcess('lhuan')p3=MyProcess('HUAN')p1.start()p2.start()p3.start()print("--------------------等待子进程--------------------")p1.join()p2.join()p3.join()print('--------------------父进程执行结束--------------------')##结果--------------------父进程开始执行----------------------------------------等待子进程--------------------6196lhuan 正在学习5108HUAN 正在学习7824LH 正在学习--------------------父进程执行结束--------------------
2.2 守护进程
会随着主进程的结束而结束
主进程创建守护进程
- 守护进程会在主进程代码执行结束后就终止
- 守护进程内无法再开启子进程,否则抛出异常
ps:进程之间是互相独立的,主进程代码运行结束,守护进程随机停止
import osimport timefrom multiprocessing import Processclass Myprocess(Process):def __init__(self,person):super().__init__()self.person=persondef run(self):print(self.person)print("子进程的pid=%s"%os.getpid())if __name__=='__main__':print("----------------父程序执行----------------")print("父程序PID :%s "%os.getpid()) ##输出当前程序的IDp=Myprocess('lhuan')p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行p.start()print("父程序PID :%s "%os.getpid()) ##输出当前程序的IDtime.sleep(10) #在sleep时查看进程id对应的进程print('----------------父程序结束----------------')##结果----------------父程序执行----------------父程序PID :2612父程序PID :2612lhuan子进程的pid=7364----------------父程序结束----------------如果p.daemon=True在p.start()之后,就会报下面的错assert self._popen is None, 'process has already started'AssertionError: process has already started
三、使用进程池pool创建进程
3.1 介绍
使用multiprocessing模块中提供的Pool类,即进程池来创建几十个或者上百个进程。
进程池对象支持带有超时和回调的异步结果,并具有映射实现。
语法规则:class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :要使用的工作进程数,如果processes是None,返回os.cpu_count()的使用数
initializer:如果没有初始化,每个工作进程将在启动时调用
maxtasksperchild:工作进程退出之间完成的任务数,可以用新的工作进程替换该任务,来达到释放未使用资源的目的。
context:用于只当用于启动工作进程的上下文。
3.2 Pool类的方法
| 方法 | 说明 |
|---|---|
| apply(func[, args[, kwds]]) | 使用阻塞方式调用func,会阻塞直到结果准备就绪 |
| apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] | 使用非阻塞方式调用func函数(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程) args为传递给func的参数列表 kwds为传递给func的关键字列表 |
| map(func,iterable [,chunksize ] ) | 会阻塞直到结果准备就绪 将可迭代项分为多个块,将其作为单独的任务提交给流程池,这些块的大小可以通过将chunksize设置为正整数来指定 |
| map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] ) | 使用非阻塞方式的map() 如果指定了callback,那么它应该是一个可以接受单个参数的callable。结果准备就绪时,将对其应用回调,除非调用失败,否则将 应用error_callback。 如果指定了error_callback,那么它应该是可调用的,可以接受单个参数。如果目标函数失败,则使用异常实例调用error_callback。 回调应该立即完成,否则处理结果的线程将被阻塞 |
| close() | 关闭Pool,使其不再接收新的任务 |
| terminate() | 不管任务是否完成,立即退出 当Pool对象被回收的时候,垃圾terminate() 将立即被调用 |
| join() | 主进程阻塞,等待子进程的退出。 ps:必须在close或terminate() 之后使用 |
| ready() | 返回呼叫是否完成 |
阻塞和非阻塞的区别:简单来说,就是可以更好的利用资源,不需要等上一个进程退出就可以执行下一个进程,可以并行执行多个进程。
##线程池from multiprocessing import Poolimport os,timedef task(name):print('子进程(%s) 执行task% s……'%(os.getpid(),name))time.sleep(1)if __name__ == '__main__':print('父进程 (%s):'%os.getpid())p = Pool(3) ##定义一个线程池,最大线程数为3for i in range(10): ##从0开始循环10次p.apply_async(task,args=(i,)) ##使用非阻塞方式调用task函数print('等待所有子进程结束……')p.close() ##关闭线程池,关闭后p不再接收新的请求p.join() ##等待子进程结束print('所有进程结束')##结果父进程 (14716):等待所有子进程结束……子进程(1604) 执行task0……子进程(15780) 执行task1……子进程(3064) 执行task2……子进程(1604) 执行task3……子进程(15780) 执行task4……子进程(3064) 执行task5……子进程(1604) 执行task6……子进程(15780) 执行task7……子进程(3064) 执行task8……子进程(1604) 执行task9……所有进程结束
四、进程间通信
4.1 进程间的数据隔离
在一个进程中的结果,没有传递到下一个进程中,即进程之间的数据是隔离的,没有共享信息。
##进程间的数据隔离from multiprocessing import Processimport time,osdef plus():print('------------------子进程1开始------------------')global g_numg_num +=50print("子进程1中g_num的值:%d"%g_num)print('------------------子进程1结束------------------')def minus():print('------------------子进程2开始------------------')global g_numg_num -=50print("子进程2中g_num的值:%d"%g_num)print('------------------子进程2结束------------------')g_num = 100 ##定义一个全局变量if __name__ == '__main__':print('------------------主进程开始------------------')print("主进程中g_num的值:%d"%g_num)p1 = Process(target = plus) ##实例化进程p1p2 = Process(target = minus) ##实例化进程p2p1.start() ##开始进程p1p2.start() ##开始进程p2p1.join() ##等待进程p1结束p2.join() ##等待进程p2结束print("主进程中g_num的值:%d"%g_num)print('------------------主进程结束------------------')#结果------------------主进程开始------------------主进程中g_num的值:100------------------子进程1开始------------------子进程1中g_num的值:150------------------子进程1结束------------------------------------子进程2开始------------------子进程2中g_num的值:50------------------子进程2结束------------------主进程中g_num的值:100------------------主进程结束------------------
Python的multiprocessing模块封装了底层的机制,提供了包括Queue(队列)、Pipes(管道)等多种方式来交换数据。
4.2 管道
Pipe()(用于两个进程之间的连接)
语法规则:multiprocessing.Pipe([duplex])
返回一对代表通信的配管的端部的对象,(conn1, conn2)
如果双工是True(默认值),那么管道是双向的。如果双工是False,则管道是单向的:conn1只能用于接收消息,conn2并且只能用于发送消息。
4.3 队列
队列(Queue)就是模仿现实生活中的排队,先进先出,后来的在队尾。
Queue本身是一个消息队列程序,队列(允许多个生产者和使用者)
语法规则:class multiprocessing.Queue([maxsize])
括号中表示指定的最大可接收的消息数量,如果没有指定或者数量为负值则表示可接收的消息数量没有上限(直到内存的尽头)。
返回使用管道和一些锁/信号量实现的进程共享队列。当流程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。
4.3.1 常用方法
常用的方法如下
| 方法 | 说明 |
|---|---|
| qsize() | 返回当前队列包含的消息数量 ps:由于多线程/多处理语义,此数字不可靠。 |
| empty() | 如果队列为空,返回True,如果队列不为空,则返回False ps:由于多线程/多处理语义,此数字不可靠。 |
| full() | 如果队列已满,返回True,如果队列不满,则返回False ps:由于多线程/多处理语义,此数字不可靠。 |
| put(obj [,block [,timeout ] ] ) | 将obj放入队列,block默认为Ture,并且timeout是None(默认值) 如果block使用默认值,且没有设置timeout(单位:秒),消息队列如果已经没有空间写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置timeout秒,如果还没有空间,则抛出queue.Full异常 如果block值为False,消息队列没有空间写入,则会立刻抛出queue.Full异常 |
| get([ block [,timeout ] ] ) | 获取消息队列中的一条消息,然后将其从队列中删除,block默认值为True 如果block使用默认值,且没有设置timeout(单位:秒),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到消息队列读到消息为至,如果设置timeout秒,则会等待timeout秒,如果还没有读到任何消息,则会抛出 queue.Empty异常 如果block值为False,消息队列为空,则会立刻抛出queue.Empty异常 |
| put_nowait(obj ) | 相当于put(obj, False) |
| get_nowait() | 相当于get(False) |
| close() | 当前进程不会再将更多数据放入此队列,后台线程将所有缓冲的数据刷新到管道后将退出。 队列被垃圾回收时,将自动调用此方法。 |
| join_thread() | 加入后台线程。仅close()在调用后才能使用。它阻塞直到后台线程退出,以确保缓冲区中的所有数据都已刷新到管道中。 |
| cancel_join_thread() | 防止join_thread()阻塞 |
4.3.2 使用
##使用process.Queuefrom multiprocessing import Queueif __name__ == '__main__':q = Queue(3) ##初始化一个Queue对象,最多可接收3条put消息q.put('消息 1')print('消息队列是否满了:%s'%q.full())q.put('消息 2')print('消息队列是否满了:%s'%q.full())q.put('消息 3')print('消息队列是否满了:%s'%q.full())##因为消息队列已满,下面的try都会发生异常try:q.put('消息4',True,2)except:print('消息队列已满,现有消息数量为%d:'%q.qsize())try:q.put_nowait('消息4')except:print('消息队列已满,现有消息数量为%d:'%q.qsize())##读取消息##判断消息队列是否为空,然后再读取if not q.empty():print('----------------从消息队列中获取消息----------------')print('消息队列的数量: %d'%q.qsize())for i in range(q.qsize()):print(q.get_nowait())print('消息队列的数量: %d'%q.qsize())##写入消息##判断消息队列是否满了,然后再写入if not q.full():print('----------------向消息队列中写入信息----------------')q.put_nowait('消息4')print('消息队列的数量: %d'%q.qsize())##结果消息队列是否满了:False消息队列是否满了:False消息队列是否满了:True消息队列已满,现有消息数量为3:消息队列已满,现有消息数量为3:----------------从消息队列中获取消息----------------消息队列的数量: 3消息 1消息 2消息 3消息队列的数量: 0----------------向消息队列中写入信息----------------消息队列的数量: 1
结合Process和Queue实现进程间的通信
##创建两个子进程,一个子进程负责向队列中写入数据,另一个进程负责从队列中读取数据。from multiprocessing import Queue,Processimport time##向队列中写入数据def write_task(q):if not q.full():for i in range(5):message = "消息" + str(i)q.put(message)print('写入: %s'%message)##从队列中读取数据def read_task(q):time.sleep(1)if not q.empty():for i in range(q.qsize()):print('读取: %s'%q.get(True,2)) ##等待两秒,如果还没读到任何消息,就抛出queue.Empty异常if __name__ == "__main__":print('--------------父进程开始------------------')q = Queue()pw = Process(target = write_task,args = (q,)) ##实例化写入队列,并且传递队列pr = Process(target = read_task,args = (q,)) ##实例化读取队列,并且传递队列pw.start() ##启动子进程pw,写入pr.start() ##启动子进程pr,读取pw.join() ##等待子进程pw结束pr.join() ##等待子进程pr结束print('--------------父进程结束------------------')##结果--------------父进程开始------------------写入: 消息0写入: 消息1写入: 消息2写入: 消息3写入: 消息4读取: 消息0读取: 消息1读取: 消息2读取: 消息3读取: 消息4--------------父进程结束------------------
