0x01:创建进程

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

示例:创建一个进程,执行两个死循环。

  1. from multiprocessing import Process
  2. import time
  3. def run_proc():
  4. """子进程要执行的代码"""
  5. while True:
  6. print("----2----")
  7. time.sleep(1)
  8. if __name__=='__main__':
  9. p = Process(target=run_proc)
  10. p.start()
  11. while True:
  12. print("----1----")
  13. time.sleep(1)

说明

  • 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动

0x02:方法说明

Process( target [, name [, args [, kwargs]]])

  • target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码
  • args:给target指定的函数传递的参数,以元组的方式传递
  • kwargs:给target指定的函数传递命名参数
  • name:给进程设定一个名字,可以不设定

Process创建的实例对象的常用方法:

  • start():启动子进程实例(创建子进程)
  • is_alive():判断进程子进程是否还在活着
  • join([timeout]):是否等待子进程执行结束,或等待多少秒
  • terminate():不管任务是否完成,立即终止子进程

Process创建的实例对象的常用属性:

  • name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
  • pid:当前进程的pid(进程号)

实例:

  1. from multiprocessing import Process
  2. import os
  3. from time import sleep
  4. def run_proc(name, age, **kwargs):
  5. for i in range(10):
  6. print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
  7. print(kwargs)
  8. sleep(0.2)
  9. if __name__=='__main__':
  10. p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
  11. p.start()
  12. sleep(1) # 1秒中之后,立即结束子进程
  13. p.terminate()
  14. p.join()

0x03:Pool

开启过多的进程并不能提高你的效率,反而会降低你的效率,假设有500个任务,同时开启500个进程,这500个进程除了不能一起执行之外(cpu没有那么多核),操作系统调度这500个进程,让他们平均在4个或8个cpu上执行,这会占用很大的空间。

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

  1. def task(n):
  2. print('{}----->start'.format(n))
  3. time.sleep(1)
  4. print('{}------>end'.format(n))
  5. if __name__ == '__main__':
  6. p = Pool(8) # 创建进程池,并指定线程池的个数,默认是CPU的核数
  7. for i in range(1, 11):
  8. # p.apply(task, args=(i,)) # 同步执行任务,一个一个的执行任务,没有并发效果
  9. p.apply_async(task, args=(i,)) # 异步执行任务,可以达到并发效果
  10. p.close()
  11. p.join()
  1. 进程池获取任务的执行结果:
  2. def task(n):
  3. print('{}----->start'.format(n))
  4. time.sleep(1)
  5. print('{}------>end'.format(n))
  6. return n ** 2
  7. if __name__ == '__main__':
  8. p = Pool(4)
  9. for i in range(1, 11):
  10. res = p.apply_async(task, args=(i,)) # res 是任务的执行结果
  11. print(res.get()) # 直接获取结果的弊端是,多任务又变成同步的了
  12. p.close()
  13. # p.join() 不需要再join了,因为 res.get()本身就是一个阻塞方法
  1. 异步获取线程的执行结果:
  2. import time
  3. from multiprocessing.pool import Pool
  4. def task(n):
  5. print('{}----->start'.format(n))
  6. time.sleep(1)
  7. print('{}------>end'.format(n))
  8. return n ** 2
  9. if __name__ == '__main__':
  10. p = Pool(4)
  11. res_list = []
  12. for i in range(1, 11):
  13. res = p.apply_async(task, args=(i,))
  14. res_list.append(res) # 使用列表来保存进程执行结果
  15. for re in res_list:
  16. print(re.get())
  17. p.close()

multiprocessing.Pool常用函数解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

0x04:进程间不能共享全局变量

  1. from multiprocessing import Process
  2. import os
  3. nums = [11, 22]
  4. def work1():
  5. """子进程要执行的代码"""
  6. print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
  7. for i in range(3):
  8. nums.append(i)
  9. print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
  10. def work2():
  11. """子进程要执行的代码"""
  12. nums.pop()
  13. print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
  14. if __name__ == '__main__':
  15. p1 = Process(target=work1)
  16. p1.start()
  17. p1.join()
  18. p2 = Process(target=work2)
  19. p2.start()
  20. print('in process0 pid={} ,nums={}'.format(os.getpid(),nums))
  1. in process1 pid=2707 ,nums=[11, 22]
  2. in process1 pid=2707 ,nums=[11, 22, 0]
  3. in process1 pid=2707 ,nums=[11, 22, 0, 1]
  4. in process1 pid=2707 ,nums=[11, 22, 0, 1, 2]
  5. in process0 pid=2706 ,nums=[11, 22]
  6. in process2 pid=2708 ,nums=[11]

0x05:进程间通信-Queue

  1. from multiprocessing import Queue
  2. q=Queue(3) #初始化一个Queue对象,最多可接收三条put消息
  3. q.put("消息1")
  4. q.put("消息2")
  5. print(q.full()) #False
  6. q.put("消息3")
  7. print(q.full()) #True
  8. #因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常
  9. try:
  10. q.put("消息4",True,2)
  11. except:
  12. print("消息列队已满,现有消息数量:%s"%q.qsize())
  13. try:
  14. q.put_nowait("消息4")
  15. except:
  16. print("消息列队已满,现有消息数量:%s"%q.qsize())
  17. #推荐的方式,先判断消息列队是否已满,再写入
  18. if not q.full():
  19. q.put_nowait("消息4")
  20. #读取消息时,先判断消息列队是否为空,再读取
  21. if not q.empty():
  22. for i in range(q.qsize()):
  23. print(q.get_nowait())

说明

初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

  • Queue.qsize():返回当前队列包含的消息数量;
  • Queue.empty():如果队列为空,返回True,反之False ;
  • Queue.full():如果队列满了,返回True,反之False;
  • Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;

1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出”Queue.Empty”异常;.

2)如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;

  • Queue.get_nowait():相当Queue.get(False);
  • Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常;

2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常;

  • Queue.put_nowait(item):相当Queue.put(item, False);

    0x06:使用Queue实现进程共享

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据: ``` from multiprocessing import Process, Queue import os, time, random

写数据进程执行的代码:

def write(q): for value in [‘A’, ‘B’, ‘C’]: print(‘Put %s to queue…’ % value) q.put(value) time.sleep(random.random())

读数据进程执行的代码:

def read(q): while True: if not q.empty(): value = q.get(True) print(‘Get %s from queue.’ % value) time.sleep(random.random()) else: break

if name==’main‘:

  1. # 父进程创建Queue,并传给各个子进程:
  2. q = Queue()
  3. pw = Process(target=write, args=(q,))
  4. pr = Process(target=read, args=(q,))
  5. # 启动子进程pw,写入:
  6. pw.start()
  7. # 等待pw结束:
  8. pw.join()
  9. # 启动子进程pr,读取:
  10. pr.start()
  11. pr.join()
  12. print('所有数据都写入并且读完')

```