进程数和 CPU 核数相等时效率最高。

CPU 密集型适合用多进程,因为可以发挥多核的优势进行并行计算。
IO 密集型就没必要用多进程了,多线程足以。而且由于多进程切换代价高,性能可能反而不如多线程。

创建进程

os.fork

最底层的多进程编程可以使用os.fork()

  1. import os
  2. import time
  3. pid = os.fork() # 从这个地方开始,进程变成两个,子进程完全克隆父进程的数据、代码等
  4. print('bobby')
  5. if pid == 0: # 通过 pid 是否为 0 判断是否子进程
  6. print(f'子进程 id: {os.getpid()},父进程 id: {os.getppid()}')
  7. else:
  8. print(f'我是父进程:{os.getpid()}')
  9. time.sleep(2)

从调用os.fork()的地方开始,会创建一个子进程,子进程完全克隆父进程的数据、代码等。
子进程从os.fork()的地方开始执行,而非从头开始执行。

:::info

  1. os.fork()实际上是使用了fork系统调用来创建子进程。
  2. os.fork()只能用于类 unix 系统,因为 windows 上没有fork系统调用。 :::

通过os.fork只是了解多进程的原理,实际编程中不会使用这种方式,因为太底层了。这种方式甚至不能跨平台。

multiprocessing.Process

multiprocessing.Process对标多线程编程中的threading.Thread,基本使用方法与其类似:

  1. import time
  2. from multiprocessing import Process
  3. class Downloader(Process):
  4. def __init__(self, sleep_time):
  5. super().__init__()
  6. self.__sleep_time = sleep_time
  7. def run(self):
  8. time.sleep(self.__sleep_time)
  9. print("sub process %d is finished" % self.pid)
  10. if __name__ == "__main__": # Windows 下多进程编程必须写在这里,Linux 下无所谓
  11. d = Downloader(1)
  12. print(d.pid) # None,因为进程还未启动
  13. d.start()
  14. print(d.pid) # 可以获取到进程的 pid
  15. d.join()
  16. print("main process is finished")

multiprocessing.Pool

可以用来创建进程池。

基本使用:

  1. import time
  2. from multiprocessing import Pool
  3. def download(sleep_time):
  4. time.sleep(sleep_time)
  5. print("download %d is finished" % sleep_time)
  6. return sleep_time
  7. if __name__ == "__main__":
  8. pool = Pool(4) # 如果未指定进程数,默认为 cpu 个数
  9. results = []
  10. for i in [3, 1, 2]:
  11. r = pool.apply_async(download, args=(i,)) # 提交任务,返回一个类似 future 的对象,用来存储结果的
  12. results.append(r)
  13. pool.close() # pool.join() 之前需要先调用该方法将进程池关闭
  14. pool.join() # 将进程池内的进程都调一遍 join() 方法
  15. for r in results:
  16. print(r.get()) # get() 方法获取进程返回的结果

imap

  1. import time
  2. from multiprocessing import Pool
  3. def download(sleep_time):
  4. time.sleep(sleep_time)
  5. print("download %d is finished" % sleep_time)
  6. return sleep_time
  7. if __name__ == "__main__":
  8. pool = Pool()
  9. iter = pool.imap(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器
  10. for r in iter: # 遍历该迭代器可以 按顺序 获取到结果。(只是获取结果是按顺序的,进程执行仍然是无序的)
  11. print(r)

imap_unordered

  1. import time
  2. from multiprocessing import Pool
  3. def download(sleep_time):
  4. time.sleep(sleep_time)
  5. print("download %d is finished" % sleep_time)
  6. return sleep_time
  7. if __name__ == "__main__":
  8. pool = Pool()
  9. iter = pool.imap_unordered(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器
  10. for r in iter: # 遍历该迭代器可以获取到结果,哪个进程先执行完就先获取哪个
  11. print(r)

ProcessPoolExecutor

ThreadPoolExecutor一样,Python 还提供ProcessPoolExecutor,它的使用方式和ThreadPoolExecutor基本一模一样,极大简化多进程编程。

ProcessPoolExecutor实际上是对multiprocessing.Pool的包装。
提供它的意义之一是让进程池、线程池拥有同样的编程接口,易于使用。

多进程编程推荐这种方式。

进程通信

进程通信,全局变量+锁的方法就行不通了。

Queue

多线程编程中使用的queue.Queue在多进程编程中是无法使用的,强行用会报错:
image.png

multiprocessing提供了一个Queue可以用于多进程,它的使用方法与queue.Queue几乎完全一致。

  1. from multiprocessing import Process, Queue
  2. def producer(queue):
  3. queue.put("a")
  4. def consumer(queue):
  5. data = queue.get()
  6. print(data)
  7. if __name__ == "__main__":
  8. q = Queue()
  9. p = Process(target=producer, args=(q,))
  10. c = Process(target=consumer, args=(q,))
  11. p.start()
  12. c.start()
  13. p.join()
  14. c.join()

注意:这个Queue无法用于Pool创建的进程池中的进程的通信。
以下代码不会有输出:

  1. from multiprocessing import Pool, Queue
  2. def producer(queue):
  3. queue.put("a")
  4. def consumer(queue):
  5. data = queue.get()
  6. print(data)
  7. if __name__ == "__main__":
  8. q = Queue()
  9. pool = Pool(2)
  10. pool.apply_async(producer, args=(q,))
  11. pool.apply_async(consumer, args=(q,))
  12. pool.close()
  13. pool.join()

Manager().Queue可以用于Pool创建的进程池中进程的通信:

  1. from multiprocessing import Pool, Manager
  2. def producer(queue):
  3. queue.put("a")
  4. def consumer(queue):
  5. data = queue.get()
  6. print(data)
  7. if __name__ == "__main__":
  8. q = Manager().Queue()
  9. pool = Pool(2)
  10. pool.apply_async(producer, args=(q,))
  11. pool.apply_async(consumer, args=(q,))
  12. pool.close()
  13. pool.join()

Pipe

Pipe只能用于两个进程间的通信,其性能要高于Queue,因为Queue加了很多锁。

创建Pipe

  1. from multiprocessing import Pipe
  2. pipe1, pipe2 = Pipe() # 创建一个 双工通道
  3. recv_pipe, send_pipe = Pipe(duplex=False) # 创建一个 单工通道

双工通道两端都可以发送和接收数据;
单工通道一边只能发送数据,另一边只能接收数据。

在 linux 平台上,双工通道底层是 socket,单工通道基于 linux 系统的管道。

使用Pipe通信的示例:

  1. from multiprocessing import Process, Pipe
  2. def producer(pipe):
  3. pipe.send("a")
  4. def consumer(pipe):
  5. data = pipe.recv()
  6. print(data)
  7. if __name__ == "__main__":
  8. pipe1, pipe2 = Pipe()
  9. p = Process(target=producer, args=(pipe1,))
  10. c = Process(target=consumer, args=(pipe2,))
  11. p.start()
  12. c.start()
  13. p.join()
  14. c.join()

Manager

使用Manager可以让进程间共享变量,用多线程编程中全局变量类似的方式来通信。
Manager中定义了常用的数据结构的类。
Manager的原理是再创建一个进程去维护这些公共变量。每实例化一个Manager就创建这样一个进程。

示例:

  1. from multiprocessing import Process, Manager
  2. def producer(ls, lock):
  3. for i in range(1000):
  4. lock.acquire()
  5. ls[0] += 1
  6. lock.release()
  7. def consumer(ls, lock):
  8. for i in range(1000):
  9. lock.acquire()
  10. ls[0] -= 1
  11. lock.release()
  12. if __name__ == "__main__":
  13. manager = Manager()
  14. ls = manager.list()
  15. lock = manager.Lock()
  16. ls.append(0)
  17. p = Process(target=producer, args=(ls, lock))
  18. c = Process(target=consumer, args=(ls, lock))
  19. p.start()
  20. c.start()
  21. p.join()
  22. c.join()
  23. print(ls[0])

Manager 提供的数据结构:

  1. class SyncManager(BaseManager, ContextManager[SyncManager]):
  2. def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
  3. def Condition(self, lock: Any = ...) -> threading.Condition: ...
  4. def Event(self) -> threading.Event: ...
  5. def Lock(self) -> threading.Lock: ...
  6. def Namespace(self) -> _Namespace: ...
  7. def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
  8. def RLock(self) -> threading.RLock: ...
  9. def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
  10. def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
  11. def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
  12. def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
  13. def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...

这些数据类型的进程安全性和对应的普通数据结构相同。
比如这里的dictlist不是进程安全的,使用时要结合锁使用。Queue是进程安全的。

这些类型实际上是通过SyncManager.register将普通数据结构注册上去的。