模块:os.fork,multiprocessing(前者支持linux,ulinx.后者则是跨平台)
os.fork:普通方法都是调用一次返回一次,这个方法调用一次返回两次。原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的ID
import os
if __name__=='__main__':
print('current Process (%s) start...'%(os.getpid())) #getpid,获取当前进程id
pid= os.fork()
if pid < 0:
print('error in fork')
elif pid==0:
print('i am child process(%s) and my parent process is (%s)'%(os.getpid(),os.getppid())) #getppid获取父进程id
else:print('i(%s) created a chlid process (%s).'%(os.getpid(),pid))
current Process (1032419) start...
i(1032419) created a chlid process (1032420).
i am child process(1032420) and my parent process is (1032419)
multiprocessing 模块创建多进程
multiprocessing模块提供了一个process类来描述进程对象,只需要传入一个执行函数和函数道德参数,就可以完成process的实例创建
启动start(),join()实现同步
import os
from multiprocessing import Process
#定义一个子进程执行任务
def run_proc(name):
print('child process %s (%s) running...' %(name,os.getpid()))
if __name__=='__main__': #__name__ == '__main__' 就表示在当前文件中,可以在if __name__ == '__main__':条件下写入测试代码,如此可以避免测试代码在模块被导入后执行。
print('parent process %s.'% os.getpid())
for i in range(5):
p=Process(target=run_proc,args=str(i),) #
print('process will start')
p.start()
p.join()
print('end')
parent process 2700.
process will start
process will start
process will start
process will start
process will start
child process 0 (11232) running...
child process 1 (30888) running...
child process 3 (19928) running...
child process 2 (22464) running...
child process 4 (13168) running...
end
从结果来看python先执行了 print(‘parent process %s.’% os.getpid())
然后在p=Process(target=run_proc,args=str(i),) 完成了5次实例创建后打印了五次print(‘process will start’)
之后在 p.start()启动了进程,当所有都执行完毕后同步了进程,同步打印了五次要是将p.join(),p.start()放到同一缩进里,那么在执行打印process will start时会同步child process 0 (11232) running…
multiprocessing模块提供了一个Pool类来代表进程池对象(解决:multiprocessing中的Process动态生成多个进程,如果是上百个、上千个目标,手动去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。)
Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它
import os,time,random
from multiprocessing import Process,Pool
def run_task(name): #定义一个子进程
print('task %s (pid= %s) is runing..'% (name,os.getpid()))
time.sleep(random.random()*3)
print('task %s end' % name)
if __name__=='__main__':
print('current process %s.'% os.getpid())
p = Pool(processes=2) #设置线程池大小2进程
for i in range(5):
p.apply_async(run_task,args=(i,))
print('waiting for all subprocesses done...')
p.close()#暂停添加新的进程
p.join() #等待所有子进程执行完毕
print('all subprocesses done')
current process 28252.
waiting for all subprocesses done...
task 0 (pid= 22048) is runing..
task 1 (pid= 19808) is runing..
task 1 end
task 2 (pid= 19808) is runing..
task 0 end
task 3 (pid= 22048) is runing..
task 3 end
task 4 (pid= 22048) is runing..
task 2 end
task 4 end
all subprocesses done
先执行父进程,打印current process %s.’% os.getpid()
if __name__=='__main__':
print('current process %s.'% os.getpid())
然后对线程池大小进行设置
p = Pool(processes=2)
之后启用for循环,创建5个子进程,name=i=range(5).之后打印waiting for all subprocesses done…
for i in range(5):
p.apply_async(run_task,args=(i,))
print('waiting for all subprocesses done...')
之后其实是先p.join执行2个实例进程时进入阻塞状态,同时暂停添加新进程。执行完毕后打印all subprocesses done
p.close()#暂停添加新的进程
p.join() #执行实例进程等待所有子进程执行完毕
print('all subprocesses done')
import os,time,random
from multiprocessing import Process,Pool
def run_task(name): #定义一个子进程
print('task %s (pid= %s) is runing..'% (name,os.getpid()))
time.sleep(random.random()*3)
print('task %s end' % name)
上面是实例创建后要执行的函数,当进程数执行完毕后,执行以下命令 time.sleep(random.random()*3),print(‘task %s end’ % name),同时添加新的进程加入执行。
进程间通信
QueQue
可以实现多个进程间的数据传递,方法:Put和Get
Put:插入数据到队列中。它还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
GET:可以从队列读取并且删除元素。
同样,Get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,分两种情况:如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常
from multiprocessing import Process,Queue
import os , time, random
def proc_write(q,urls):#子进程写入任务
print('process(%s) is writing...' % os.getpid())
for url in urls:
q.put(url)
print('put %s to queque' % os.getpid())
time.sleep(random.random())
def proc_read(q): #子进程读取任务
print('Process (%s) is reading' % os.getpid())
while True:
url=q.get(True)
print('Get %s from queque' % url)
if __name__=='__main__': #父进程
q = Queue() #创建QueQue 多进程安全队列
proc_write1= Process(target=proc_write,args=(q,['url_1','URL_2'])) #write队列1
proc_write2 = Process(target=proc_write, args=(q, ['url_3', 'URL_4'])) #write队列2
proc_readed=Process(target=proc_read,args=(q,))#read队列1
#开始proc_write子进程
proc_write1.start()
proc_write2.start()
#开始proc_read子进程
proc_readed.start()
#等待write结束
proc_write1.join() 同步进程
proc_write2.join()
proc_readed.terminate() 强行终止
process(2472) is writing...
put 2472 to queque
process(16816) is writing...
put 16816 to queque
Process (13072) is reading
Get url_1 from queque
Get url_3 from queque
put 16816 to queque
Get URL_4 from queque
put 2472 to queque
Get URL_2 from queque
父进程创建2个write队列,通过target实例化子进程,args传递参数,[‘url_1’,’URL_2’] 数组传递给了urls
if __name__=='__main__': #父进程
q = Queue() #创建QueQue 多进程安全队列
proc_write1= Process(target=proc_write,args=(q,['url_1','URL_2'])) #write队列1
proc_write2 = Process(target=proc_write, args=(q, ['url_3', 'URL_4'])) #write队列2
proc_readed=Process(target=proc_read,args=(q,))#read队列1
开始队列
proc_write1.start()
proc_write2.start()
#开始proc_read子进程
proc_readed.start()
先打印print(‘process(%s) is writing…’ % os.getpid()),然后进入循环,队列传输数组,将第一列数组插入队列中并打印put %s to queque’ % os.getpid()*3因为一组有三条,通过proc_read的q.get将队列里的数据赋值给url,url=q.get(True).,并打印 print(‘Get %s from queque’ % url)
def proc_write(q,urls):#子进程写入任务
print('process(%s) is writing...' % os.getpid())
for url in urls:
q.put(url)
print('put %s to queque' % os.getpid())
time.sleep(random.random())
def proc_read(q): #子进程读取任务
print('Process (%s) is reading' % os.getpid())
while True:
url=q.get(True)
print('Get %s from queque' % url)
进程间通信
Pipe(常用于两个进程间通信)
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。若duplex为False, conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError
import multiprocessing
import os , time, random
def proc_send(pipe,urls):
for url in urls:
print("process(%s) send: %s" %(os.getpid(),url))
pipe.send(url) #发送
time.sleep(random.random())
def proc_recv(pipe):
while True:
print("Process(%s) rec:%s" %(os.getpid(),pipe.recv()) ) #接收
time.sleep(random.random())
if __name__=="__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i)for i in range(10)]))
p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()