进程数和 CPU 核数相等时效率最高。
CPU 密集型适合用多进程,因为可以发挥多核的优势进行并行计算。
IO 密集型就没必要用多进程了,多线程足以。而且由于多进程切换代价高,性能可能反而不如多线程。
创建进程
os.fork
最底层的多进程编程可以使用os.fork()
:
import os
import time
pid = os.fork() # 从这个地方开始,进程变成两个,子进程完全克隆父进程的数据、代码等
print('bobby')
if pid == 0: # 通过 pid 是否为 0 判断是否子进程
print(f'子进程 id: {os.getpid()},父进程 id: {os.getppid()}')
else:
print(f'我是父进程:{os.getpid()}')
time.sleep(2)
从调用os.fork()
的地方开始,会创建一个子进程,子进程完全克隆父进程的数据、代码等。
子进程从os.fork()
的地方开始执行,而非从头开始执行。
:::info
os.fork()
实际上是使用了fork
系统调用来创建子进程。os.fork()
只能用于类 unix 系统,因为 windows 上没有fork
系统调用。 :::
通过os.fork
只是了解多进程的原理,实际编程中不会使用这种方式,因为太底层了。这种方式甚至不能跨平台。
multiprocessing.Process
multiprocessing.Process
对标多线程编程中的threading.Thread
,基本使用方法与其类似:
import time
from multiprocessing import Process
class Downloader(Process):
def __init__(self, sleep_time):
super().__init__()
self.__sleep_time = sleep_time
def run(self):
time.sleep(self.__sleep_time)
print("sub process %d is finished" % self.pid)
if __name__ == "__main__": # Windows 下多进程编程必须写在这里,Linux 下无所谓
d = Downloader(1)
print(d.pid) # None,因为进程还未启动
d.start()
print(d.pid) # 可以获取到进程的 pid
d.join()
print("main process is finished")
multiprocessing.Pool
可以用来创建进程池。
基本使用:
import time
from multiprocessing import Pool
def download(sleep_time):
time.sleep(sleep_time)
print("download %d is finished" % sleep_time)
return sleep_time
if __name__ == "__main__":
pool = Pool(4) # 如果未指定进程数,默认为 cpu 个数
results = []
for i in [3, 1, 2]:
r = pool.apply_async(download, args=(i,)) # 提交任务,返回一个类似 future 的对象,用来存储结果的
results.append(r)
pool.close() # pool.join() 之前需要先调用该方法将进程池关闭
pool.join() # 将进程池内的进程都调一遍 join() 方法
for r in results:
print(r.get()) # get() 方法获取进程返回的结果
imap
:
import time
from multiprocessing import Pool
def download(sleep_time):
time.sleep(sleep_time)
print("download %d is finished" % sleep_time)
return sleep_time
if __name__ == "__main__":
pool = Pool()
iter = pool.imap(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器
for r in iter: # 遍历该迭代器可以 按顺序 获取到结果。(只是获取结果是按顺序的,进程执行仍然是无序的)
print(r)
imap_unordered
:
import time
from multiprocessing import Pool
def download(sleep_time):
time.sleep(sleep_time)
print("download %d is finished" % sleep_time)
return sleep_time
if __name__ == "__main__":
pool = Pool()
iter = pool.imap_unordered(download, [1, 5, 3]) # 创建多个任务放入进程池并开始执行,返回一个迭代器
for r in iter: # 遍历该迭代器可以获取到结果,哪个进程先执行完就先获取哪个
print(r)
ProcessPoolExecutor
和ThreadPoolExecutor
一样,Python 还提供ProcessPoolExecutor
,它的使用方式和ThreadPoolExecutor
基本一模一样,极大简化多进程编程。
ProcessPoolExecutor
实际上是对multiprocessing.Pool
的包装。
提供它的意义之一是让进程池、线程池拥有同样的编程接口,易于使用。
多进程编程推荐这种方式。
进程通信
进程通信,全局变量+锁的方法就行不通了。
Queue
多线程编程中使用的queue.Queue
在多进程编程中是无法使用的,强行用会报错:
multiprocessing
提供了一个Queue
可以用于多进程,它的使用方法与queue.Queue
几乎完全一致。
from multiprocessing import Process, Queue
def producer(queue):
queue.put("a")
def consumer(queue):
data = queue.get()
print(data)
if __name__ == "__main__":
q = Queue()
p = Process(target=producer, args=(q,))
c = Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
c.join()
注意:这个Queue
无法用于Pool
创建的进程池中的进程的通信。
以下代码不会有输出:
from multiprocessing import Pool, Queue
def producer(queue):
queue.put("a")
def consumer(queue):
data = queue.get()
print(data)
if __name__ == "__main__":
q = Queue()
pool = Pool(2)
pool.apply_async(producer, args=(q,))
pool.apply_async(consumer, args=(q,))
pool.close()
pool.join()
Manager().Queue
可以用于Pool
创建的进程池中进程的通信:
from multiprocessing import Pool, Manager
def producer(queue):
queue.put("a")
def consumer(queue):
data = queue.get()
print(data)
if __name__ == "__main__":
q = Manager().Queue()
pool = Pool(2)
pool.apply_async(producer, args=(q,))
pool.apply_async(consumer, args=(q,))
pool.close()
pool.join()
Pipe
Pipe
只能用于两个进程间的通信,其性能要高于Queue
,因为Queue
加了很多锁。
创建Pipe
:
from multiprocessing import Pipe
pipe1, pipe2 = Pipe() # 创建一个 双工通道
recv_pipe, send_pipe = Pipe(duplex=False) # 创建一个 单工通道
双工通道两端都可以发送和接收数据;
单工通道一边只能发送数据,另一边只能接收数据。
在 linux 平台上,双工通道底层是 socket,单工通道基于 linux 系统的管道。
使用Pipe
通信的示例:
from multiprocessing import Process, Pipe
def producer(pipe):
pipe.send("a")
def consumer(pipe):
data = pipe.recv()
print(data)
if __name__ == "__main__":
pipe1, pipe2 = Pipe()
p = Process(target=producer, args=(pipe1,))
c = Process(target=consumer, args=(pipe2,))
p.start()
c.start()
p.join()
c.join()
Manager
使用Manager
可以让进程间共享变量,用多线程编程中全局变量类似的方式来通信。Manager
中定义了常用的数据结构的类。Manager
的原理是再创建一个进程去维护这些公共变量。每实例化一个Manager
就创建这样一个进程。
示例:
from multiprocessing import Process, Manager
def producer(ls, lock):
for i in range(1000):
lock.acquire()
ls[0] += 1
lock.release()
def consumer(ls, lock):
for i in range(1000):
lock.acquire()
ls[0] -= 1
lock.release()
if __name__ == "__main__":
manager = Manager()
ls = manager.list()
lock = manager.Lock()
ls.append(0)
p = Process(target=producer, args=(ls, lock))
c = Process(target=consumer, args=(ls, lock))
p.start()
c.start()
p.join()
c.join()
print(ls[0])
Manager 提供的数据结构:
class SyncManager(BaseManager, ContextManager[SyncManager]):
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...
这些数据类型的进程安全性和对应的普通数据结构相同。
比如这里的dict
、list
不是进程安全的,使用时要结合锁使用。Queue
是进程安全的。
这些类型实际上是通过SyncManager.register
将普通数据结构注册上去的。