Barrier就是使两个进程同时进行。假设一个进程在之前的任务跑的快了,遇到b.wait()
的时候就会等待另一个进程。
from multiprocessing import Queue, Lock, Barrier
import multiprocessing as mp
import time
import os
import torch
# function1 计数并打印
def pprint(r, b):
b.wait()
print('pid:{}, time:{}'.format(os.getpid(), time.time()))
for i in range(r):
print('thread %s: %d' % (os.getpid(), i))
time.sleep(0.5)
print('func1 运行完成')
# function2 将数据放入列表
def f(queue:Queue, b):
b.wait()
print('pid:{}, time:{}'.format(os.getpid(), time.time()))
queue.put(torch.tensor([1,1,1]))
print('func2 运行完成')
if __name__ == '__main__':
l = Lock()
b = Barrier(2)
q = Queue()
process1 = mp.Process(target=pprint, args=(5, b))
process2 = mp.Process(target=f, args=(q, b))
process1.start()
process2.start()
process1.join()
process2.join()
pid:5896, time:1648986323.05494pid:1808, time:1648986323.05494
thread 1808: 0
func2 运行完成
thread 1808: 1
thread 1808: 2
thread 1808: 3
thread 1808: 4
func1 运行完成
进程1比进程2先开始,但是由于函数内有Barrier,需要等待进程2,所以Barrier使得两个进程同时开始。
当然Barrier也可以放在主函数内进行调用,使得所有的线程(不包含主线程)完成任务后再由主线程统一继续的任务。