进程数和 CPU 核数相等时效率最高。
CPU 密集型适合用多进程,因为可以发挥多核的优势进行并行计算。
IO 密集型就没必要用多进程了,多线程足以。而且由于多进程切换代价高,性能可能反而不如多线程。
创建进程
os.fork
最底层的多进程编程可以使用os.fork():
import osimport timepid = os.fork() # 从这个地方开始,进程变成两个,子进程完全克隆父进程的数据、代码等print('bobby')if pid == 0: # 通过 pid 是否为 0 判断是否子进程print(f'子进程 id: {os.getpid()},父进程 id: {os.getppid()}')else:print(f'我是父进程:{os.getpid()}')time.sleep(2)
从调用os.fork()的地方开始,会创建一个子进程,子进程完全克隆父进程的数据、代码等。
子进程从os.fork()的地方开始执行,而非从头开始执行。
:::info
os.fork()实际上是使用了fork系统调用来创建子进程。os.fork()只能用于类 unix 系统,因为 windows 上没有fork系统调用。 :::
通过os.fork只是了解多进程的原理,实际编程中不会使用这种方式,因为太底层了。这种方式甚至不能跨平台。
multiprocessing.Process
multiprocessing.Process对标多线程编程中的threading.Thread,基本使用方法与其类似:
import timefrom multiprocessing import Processclass Downloader(Process):def __init__(self, sleep_time):super().__init__()self.__sleep_time = sleep_timedef run(self):time.sleep(self.__sleep_time)print("sub process %d is finished" % self.pid)if __name__ == "__main__": # Windows 下多进程编程必须写在这里,Linux 下无所谓d = Downloader(1)print(d.pid) # None,因为进程还未启动d.start()print(d.pid) # 可以获取到进程的 pidd.join()print("main process is finished")
multiprocessing.Pool
可以用来创建进程池。
基本使用:
import timefrom multiprocessing import Pooldef download(sleep_time):time.sleep(sleep_time)print("download %d is finished" % sleep_time)return sleep_timeif __name__ == "__main__":pool = Pool(4) # 如果未指定进程数,默认为 cpu 个数results = []for i in [3, 1, 2]:r = pool.apply_async(download, args=(i,)) # 提交任务,返回一个类似 future 的对象,用来存储结果的results.append(r)pool.close() # pool.join() 之前需要先调用该方法将进程池关闭pool.join() # 将进程池内的进程都调一遍 join() 方法for r in results:print(r.get()) # get() 方法获取进程返回的结果
imap:
import timefrom multiprocessing import Pooldef download(sleep_time):time.sleep(sleep_time)print("download %d is finished" % sleep_time)return sleep_timeif __name__ == "__main__":pool = Pool()iter = pool.imap(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器for r in iter: # 遍历该迭代器可以 按顺序 获取到结果。(只是获取结果是按顺序的,进程执行仍然是无序的)print(r)
imap_unordered:
import timefrom multiprocessing import Pooldef download(sleep_time):time.sleep(sleep_time)print("download %d is finished" % sleep_time)return sleep_timeif __name__ == "__main__":pool = Pool()iter = pool.imap_unordered(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器for r in iter: # 遍历该迭代器可以获取到结果,哪个进程先执行完就先获取哪个print(r)
ProcessPoolExecutor
和ThreadPoolExecutor一样,Python 还提供ProcessPoolExecutor,它的使用方式和ThreadPoolExecutor基本一模一样,极大简化多进程编程。
ProcessPoolExecutor实际上是对multiprocessing.Pool的包装。
提供它的意义之一是让进程池、线程池拥有同样的编程接口,易于使用。
多进程编程推荐这种方式。
进程通信
进程通信,全局变量+锁的方法就行不通了。
Queue
多线程编程中使用的queue.Queue在多进程编程中是无法使用的,强行用会报错:
multiprocessing提供了一个Queue可以用于多进程,它的使用方法与queue.Queue几乎完全一致。
from multiprocessing import Process, Queuedef producer(queue):queue.put("a")def consumer(queue):data = queue.get()print(data)if __name__ == "__main__":q = Queue()p = Process(target=producer, args=(q,))c = Process(target=consumer, args=(q,))p.start()c.start()p.join()c.join()
注意:这个Queue无法用于Pool创建的进程池中的进程的通信。
以下代码不会有输出:
from multiprocessing import Pool, Queuedef producer(queue):queue.put("a")def consumer(queue):data = queue.get()print(data)if __name__ == "__main__":q = Queue()pool = Pool(2)pool.apply_async(producer, args=(q,))pool.apply_async(consumer, args=(q,))pool.close()pool.join()
Manager().Queue可以用于Pool创建的进程池中进程的通信:
from multiprocessing import Pool, Managerdef producer(queue):queue.put("a")def consumer(queue):data = queue.get()print(data)if __name__ == "__main__":q = Manager().Queue()pool = Pool(2)pool.apply_async(producer, args=(q,))pool.apply_async(consumer, args=(q,))pool.close()pool.join()
Pipe
Pipe只能用于两个进程间的通信,其性能要高于Queue,因为Queue加了很多锁。
创建Pipe:
from multiprocessing import Pipepipe1, pipe2 = Pipe() # 创建一个 双工通道recv_pipe, send_pipe = Pipe(duplex=False) # 创建一个 单工通道
双工通道两端都可以发送和接收数据;
单工通道一边只能发送数据,另一边只能接收数据。
在 linux 平台上,双工通道底层是 socket,单工通道基于 linux 系统的管道。
使用Pipe通信的示例:
from multiprocessing import Process, Pipedef producer(pipe):pipe.send("a")def consumer(pipe):data = pipe.recv()print(data)if __name__ == "__main__":pipe1, pipe2 = Pipe()p = Process(target=producer, args=(pipe1,))c = Process(target=consumer, args=(pipe2,))p.start()c.start()p.join()c.join()
Manager
使用Manager可以让进程间共享变量,用多线程编程中全局变量类似的方式来通信。Manager中定义了常用的数据结构的类。Manager的原理是再创建一个进程去维护这些公共变量。每实例化一个Manager就创建这样一个进程。
示例:
from multiprocessing import Process, Managerdef producer(ls, lock):for i in range(1000):lock.acquire()ls[0] += 1lock.release()def consumer(ls, lock):for i in range(1000):lock.acquire()ls[0] -= 1lock.release()if __name__ == "__main__":manager = Manager()ls = manager.list()lock = manager.Lock()ls.append(0)p = Process(target=producer, args=(ls, lock))c = Process(target=consumer, args=(ls, lock))p.start()c.start()p.join()c.join()print(ls[0])
Manager 提供的数据结构:
class SyncManager(BaseManager, ContextManager[SyncManager]):def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...def Condition(self, lock: Any = ...) -> threading.Condition: ...def Event(self) -> threading.Event: ...def Lock(self) -> threading.Lock: ...def Namespace(self) -> _Namespace: ...def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...def RLock(self) -> threading.RLock: ...def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...
这些数据类型的进程安全性和对应的普通数据结构相同。
比如这里的dict、list不是进程安全的,使用时要结合锁使用。Queue是进程安全的。
这些类型实际上是通过SyncManager.register将普通数据结构注册上去的。
