添加进程 Process
p1 = mp.Process(target=job,args=(1,2))p1.start()p1.join()
定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错
p1 = mp.Process(target=job,args=(q,))p2 = mp.Process(target=job,args=(q,))
进程池 Pool() 和 map()
然后我们定义一个Pool
pool = mp.Pool()
有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果
res = pool.map(job, range(10))
让我们来运行一下
def multicore():pool = mp.Pool()res = pool.map(job, range(10))print(res)if __name__ == '__main__':multicore()
运行结果:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
自定义核数量
我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,
def multicore():pool = mp.Pool(processes=3) # 定义CPU核数量为3res = pool.map(job, range(10))print(res)
apply_async()
Pool除了map()外,还有可以返回结果的方式,那就是apply_async().
apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
def multicore():pool = mp.Pool()res = pool.map(job, range(10))print(res)res = pool.apply_async(job, (2,))# 用get获得结果print(res.get())
运行结果;
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()4 # apply_async()
用 apply_async() 输出多个结果
那么如何用apply_async()输出多个迭代呢?
我们在apply_async()中多传入几个值试试
res = pool.apply_async(job, (2,3,4,))
结果会报错:
TypeError: job() takes exactly 1 argument (3 given)
即apply_async()只能输入一组参数。
在此我们将apply_async() 放入迭代器中,定义一个新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
同样在取出值时需要一个一个取出来
print([res.get() for res in multi_res])
Shared Value
我们可以通过使用Value数据存储在一个共享的内存表中。
import multiprocessing as mpvalue1 = mp.Value('i', 0)value2 = mp.Value('d', 3.14)
其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。
Shared Array
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。
参考数据形式
各参数代表的数据类型
| Type code | C Type | Python Type | Minimum size in bytes || --------- | ------------------ | ----------------- | --------------------- || `'b'` | signed char | int | 1 || `'B'` | unsigned char | int | 1 || `'u'` | Py_UNICODE | Unicode character | 2 || `'h'` | signed short | int | 2 || `'H'` | unsigned short | int | 2 || `'i'` | signed int | int | 2 || `'I'` | unsigned int | int | 2 || `'l'` | signed long | int | 4 || `'L'` | unsigned long | int | 4 || `'q'` | signed long long | int | 8 || `'Q'` | unsigned long long | int | 8 || `'f'` | float | float | 4 || `'d'` | double | float | 8 |
进程锁
首先需要定义一个进程锁
l = mp.Lock() # 定义一个进程锁
然后将进程锁的信息传入各个进程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入p2 = mp.Process(target=job, args=(v,3,l))
在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l):l.acquire() # 锁住for _ in range(5):time.sleep(0.1)v.value += num # v.value获取共享内存print(v.value)l.release() # 释放
