添加进程 Process

  1. p1 = mp.Process(target=job,args=(1,2))
  2. p1.start()
  3. p1.join()

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

  1. p1 = mp.Process(target=job,args=(q,))
  2. p2 = mp.Process(target=job,args=(q,))

进程池 Pool() 和 map()

然后我们定义一个Pool

  1. pool = mp.Pool()

有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

  1. res = pool.map(job, range(10))

让我们来运行一下

  1. def multicore():
  2. pool = mp.Pool()
  3. res = pool.map(job, range(10))
  4. print(res)
  5. if __name__ == '__main__':
  6. multicore()

运行结果:

  1. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

自定义核数量

我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,

  1. def multicore():
  2. pool = mp.Pool(processes=3) # 定义CPU核数量为3
  3. res = pool.map(job, range(10))
  4. print(res)

apply_async()

Pool除了map()外,还有可以返回结果的方式,那就是apply_async().

apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

  1. def multicore():
  2. pool = mp.Pool()
  3. res = pool.map(job, range(10))
  4. print(res)
  5. res = pool.apply_async(job, (2,))
  6. # 用get获得结果
  7. print(res.get())

运行结果;

  1. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
  2. 4 # apply_async()

用 apply_async() 输出多个结果

那么如何用apply_async()输出多个迭代呢?

我们在apply_async()中多传入几个值试试

  1. res = pool.apply_async(job, (2,3,4,))

结果会报错:

  1. TypeError: job() takes exactly 1 argument (3 given)

apply_async()只能输入一组参数。

在此我们将apply_async() 放入迭代器中,定义一个新的multi_res

  1. multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

同样在取出值时需要一个一个取出来

  1. print([res.get() for res in multi_res])

Shared Value

我们可以通过使用Value数据存储在一个共享的内存表中。

  1. import multiprocessing as mp
  2. value1 = mp.Value('i', 0)
  3. value2 = mp.Value('d', 3.14)

其中di参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。

Shared Array

在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。

  1. array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。

参考数据形式

各参数代表的数据类型

  1. | Type code | C Type | Python Type | Minimum size in bytes |
  2. | --------- | ------------------ | ----------------- | --------------------- |
  3. | `'b'` | signed char | int | 1 |
  4. | `'B'` | unsigned char | int | 1 |
  5. | `'u'` | Py_UNICODE | Unicode character | 2 |
  6. | `'h'` | signed short | int | 2 |
  7. | `'H'` | unsigned short | int | 2 |
  8. | `'i'` | signed int | int | 2 |
  9. | `'I'` | unsigned int | int | 2 |
  10. | `'l'` | signed long | int | 4 |
  11. | `'L'` | unsigned long | int | 4 |
  12. | `'q'` | signed long long | int | 8 |
  13. | `'Q'` | unsigned long long | int | 8 |
  14. | `'f'` | float | float | 4 |
  15. | `'d'` | double | float | 8 |

进程锁

首先需要定义一个进程锁

  1. l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

  1. p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
  2. p2 = mp.Process(target=job, args=(v,3,l))

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

  1. def job(v, num, l):
  2. l.acquire() # 锁住
  3. for _ in range(5):
  4. time.sleep(0.1)
  5. v.value += num # v.value获取共享内存
  6. print(v.value)
  7. l.release() # 释放