简介

  • 进程:资源调度的基本单位,亦是一个程序在一个数据集上的一次动态执行过程,如下每一个应用程序即一个进程。同时一个主进程下,可能会有多个子进程。

image.png

并发与并行

  • 进程就像工厂(即CPU核心数,一般为8核,4核等),为程序进行工作,当工厂数量很多的时候,可以一个工厂做一个程序的工作,称之为并行。

    1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1325594/1594259389438-6a315984-bbb1-4329-a969-be3a7043ffa5.png#align=left&display=inline&height=159&margin=%5Bobject%20Object%5D&name=image.png&originHeight=338&originWidth=548&size=33754&status=done&style=none&width=257)
  • 如果CPU是单核,但是想要同时运行多个应用,就需要再两个程序间切换,只是切换的速度特别快,人眼一般体会不到,这就称之为并发。

    image.png

普通多函数

  1. import multiprocessing
  2. import time
  3. def func1():
  4. while True:
  5. print("running func1")
  6. time.sleep(1)
  7. def func2():
  8. while True:
  9. print("running func2")
  10. time.sleep(1)
  11. if __name__ == '__main__':
  12. func1()
  13. func2()
  • 运行结果:

image.png

  • 如上可知,程序一直执行func1,造成阻塞。不会运行func2,如果想要func1和func2同时运行,可以使用多进程。
  • 阻塞与非阻塞

    • 阻塞:下一个任务执行需要等待前一个任务执行结束
    • 非阻塞:与阻塞相反

      多进程使用

  • 创建子进程

    • p = multiprocessing.Process(target=func1, args=())
  • 开启进程
    • p.start()
  • 查看进程
    • os.getpid()
  • 等待子进程结束 timeout 等待超时
    • p.join(timeout=5)
  • 关闭进程
    • p.close()
  • 守护进程 再主进程结束后结束子进程
    • p.daemon = True
  1. import multiprocessing
  2. import time
  3. import os
  4. def func1():
  5. print("当前进程:", os.getpid())
  6. for i in range(3):
  7. print("running func1")
  8. time.sleep(1)
  9. def func2():
  10. print("当前进程:", os.getpid())
  11. for i in range(3):
  12. print("running func2")
  13. time.sleep(1)
  14. if __name__ == '__main__':
  15. p = multiprocessing.Process(target=func1, args=())
  16. p1 = multiprocessing.Process(target=func2, args=())
  17. # p.daemon = True
  18. # p1.daemon = True
  19. print("主进程:", os.getpid())
  20. p.start()
  21. p1.start()
  22. p.join()
  23. p1.join()
  24. p.close()
  25. p1.close()
  26. print("程序结束!")
  • 运行结果:

image.png

  • 由上图可知,主进程先执行,然后使用start开启子进程,主进程等待子进程结束之后再结束!
  • 如果使用了daemon的话,主进程不等待子进程运行完,就结束主进程

继承方式

  1. import multiprocessing
  2. import time
  3. import os
  4. class P1(multiprocessing.Process):
  5. def run(self):
  6. print("当前进程:", os.getpid())
  7. for i in range(3):
  8. print("running func1")
  9. time.sleep(1)
  10. class P2(multiprocessing.Process):
  11. def run(self):
  12. print("当前进程:", os.getpid())
  13. for i in range(3):
  14. print("running func2")
  15. time.sleep(1)
  16. if __name__ == '__main__':
  17. p1 = P1()
  18. p2 = P2()
  19. p1.daemon = True
  20. p2.daemon = True
  21. p1.start()
  22. p2.start()
  23. print("主进程:", os.getpid())
  24. time.sleep(2)
  25. print("程序结束!")
  1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1325594/1594262132516-14353069-b26f-4af2-be92-73d314ae123b.png#align=left&display=inline&height=172&margin=%5Bobject%20Object%5D&name=image.png&originHeight=215&originWidth=715&size=46179&status=done&style=none&width=573)
  • 如上所示,使用了守护进程,在子进程还未结束的时候,主进程执行结束后即程序结束。主要守护进程需要在进程开启前使用

    使用进程池

  • 实例化

    • pool = multiprocessing.Pool()
  • 向进程池添加任务异步执行
    • pool.apply_asnyc(func=func, args=())
  • 关闭进程池 不能继续添加任务
    • pool.close()
  • 主进程等待子进程结束
    • pool.join()
  1. import multiprocessing
  2. import time
  3. import os
  4. def func1(num):
  5. print("当前进程:", os.getpid())
  6. for i in range(num):
  7. print(f"running func1 {num}")
  8. time.sleep(1)
  9. def func2(num):
  10. print("当前进程:", os.getpid())
  11. for i in range(num):
  12. print(f"running func2 {num}")
  13. time.sleep(1)
  14. if __name__ == '__main__':
  15. pool = multiprocessing.Pool(1)
  16. pool.apply_async(func=func1, args=(1,))
  17. pool.apply_async(func=func2, args=(2,))
  18. pool.close()
  19. pool.join()
  1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1325594/1594263244287-8a17b2d8-39f9-4025-839c-550518510196.png#align=left&display=inline&height=153&margin=%5Bobject%20Object%5D&name=image.png&originHeight=186&originWidth=697&size=50228&status=done&style=none&width=573)
  • 需要注意参数元组传递方式

进程间通信

  • 进程与进程之间相互独立,资源不共享。进程与进程之间传递信息,可以使用队列,类似于列表可以存放数据,不过是先进先出。
    • 导包
      • from multiprocessing import Queue
    • 实例化
      • q = Queue()
    • 常用方法
      • q.put() 向队列添加一个数据 如果队列满了则阻塞
      • q.get() 从队列中取出一个数据 如果队列为空则阻塞
      • q.qsize() 查看队列大小
      • q.empty() 判断队列是否为空
      • q.put_nowait() 添加一个数据,如果队列满了则报错
      • q.get_nowait() 获取一个数据,如果队列为空则报错 ```python from multiprocessing import Queue import multiprocessing import time import os

def func1(q): print(“当前进程:”, os.getpid())

  1. for i in range(3):
  2. q.put(i)
  3. print(f"running func1 put {i}")
  4. time.sleep(1)

def func2(q): time.sleep(1) print(“当前进程:”, os.getpid())

  1. while q.qsize():
  2. n = q.get()
  3. print(f"running func2 get {n}")
  4. time.sleep(1)

if name == ‘main‘: q = Queue()

  1. p = multiprocessing.Process(target=func1, args=(q,))
  2. p1 = multiprocessing.Process(target=func2, args=(q, ))
  3. print("主进程:", os.getpid())
  4. p.start()
  5. p1.start()
  6. p.join()
  7. p1.join()
  8. p.close()
  9. p1.close()
  10. print("程序结束!")

```

  • 运行结果

    1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1325594/1594264749245-d54e3634-3fd6-4448-ab10-5054ec61b14f.png#align=left&display=inline&height=108&margin=%5Bobject%20Object%5D&name=image.png&originHeight=244&originWidth=1291&size=55547&status=done&style=none&width=573)
    • 需要注意的是队列阻塞问题,因为两个进程同时开启,在func1还未添加数据到队列的时候,func2就判断队列是否为空,必然不会执行while语句里的 内容,所以让func1在put数据的时候,让func2休眠一会儿。

      进程同步

  • 进程之间虽然不共享资源 ,但是共享一套文件系统,比如对同一文件进行写入的时候,必然带来竞争导致错乱,需要对进程进行控制,需要用到进程锁

  • 使用:
    • 导包
      • from multiprocssing import Lock
      • 进程池导包方式
        • from multiprocssing.Manager() import Lock
    • 实例化
      • mutex = Lock()
    • 方法
      • mutex.aquire() 锁定
      • mutex。release() 释放