1.10。使用@jit 自动并行化

原文: http://numba.pydata.org/numba-doc/latest/user/parallel.html

jit() 设置并行选项可启用 Numba 转换过程,该过程尝试自动并行化并对函数(部分)执行其他优化。目前,此功能仅适用于 CPU。

用户定义的函数内的一些操作,例如,向数组添加标量值已知具有并行语义。用户程序可以包含许多这样的操作,并且虽然每个操作可以单独并行化,但是这种方法通常由于较差的高速缓存行为而具有低廉的性能。相反,通过自动并行化,Numba 尝试在用户程序中识别此类操作,并将相邻的操作融合在一起,以形成一个或多个并行自动运行的内核。该过程完全自动化而无需修改用户程序,这与 Numba 的 vectorize()guvectorize() 机制形成对比,其中需要手动操作来创建并行内核。

1.10.1。支持的操作

在本节中,我们列出了所有具有并行语义且我们尝试并行化的数组操作。

  1. 案例研究支持的所有 numba 数组操作:数组表达式,包括 Numpy 数组之间,数组和标量之间的常用算术函数,以及 Numpy ufuncs。它们通常被称为<cite>元素</cite>或<cite>逐点</cite>数组操作:

    > 一元算子:+ - ~ > 二元算子:+ - * / /? % | &gt;&gt; ^ &lt;&lt; & ** // > 比较运算符:== != &lt; &lt;= &gt; &gt;= > nopython 模式支持的 Numpy ufuncs。 > * 用户定义 DUFuncvectorize()

  2. Numpy 减少函数sumprodminmaxargminargmax。此外,数组数学函数meanvarstd

  3. Numpy 数组创建函数zerosonesarangelinspace和几个随机函数(rand,randn,ranf,random_sample,sample,random,standard_normal,chisquare,weibull,power,geometric,exponential,poisson ,瑞利,正常,均匀,贝塔,二项式,f,伽玛,对数正态,拉普拉斯,兰丁,三角形​​)。

  4. Numpy dot函数在矩阵和向量之间,或两个向量之间。在所有其他情况下,使用 Numba 的默认实现。

  5. 当操作数具有匹配的尺寸和大小时,上述操作也支持多维数组。不支持具有混合维度或大小的阵列之间的 Numpy 广播的完整语义,也不支持所选维度上的减少。

  6. 数组赋值,其中目标是使用切片或布尔数组的数组选择,并且指定的值是标量或其他选择,其中切片范围或位阵列被推断为兼容。

  7. functoolsreduce运算符支持指定 1D Numpy 数组的并行减少,但初始值参数是必需的。

1.10.2。显式并行循环

代码转换传递的另一个特性(当parallel=True时)支持显式并行循环。可以使用 Numba 的prange而不是range来指定循环可以并行化。除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。

如果变量由二元函数/运算符使用其在循环体中的先前值更新,则自动推断减少。对于+=*=运算符,自动推断减少的初始值。对于其他函数/运算符,reduce 变量应在进入prange循环之前保持标识值。对于标量和任意维度的数组,支持以这种方式减少。

下面的示例演示了一个带有缩减的并行循环(A是一维 Numpy 数组):

  1. from numba import njit, prange
  2. @njit(parallel=True)
  3. def prange_test(A):
  4. s = 0
  5. # Without "parallel=True" in the jit-decorator
  6. # the prange statement is equivalent to range
  7. for i in prange(A.shape[0]):
  8. s += A[i]
  9. return s

以下示例演示了二维数组的产品缩减:

  1. from numba import njit, prange
  2. import numpy as np
  3. @njit(parallel=True)
  4. def two_d_array_reduction_prod(n):
  5. shp = (13, 17)
  6. result1 = 2 * np.ones(shp, np.int_)
  7. tmp = 2 * np.ones_like(result1)
  8. for i in prange(n):
  9. result1 *= tmp
  10. return result1

1.10.3。示例

在本节中,我们举例说明此功能如何帮助并行化 Logistic 回归:

  1. @numba.jit(nopython=True, parallel=True)
  2. def logistic_regression(Y, X, w, iterations):
  3. for i in range(iterations):
  4. w -= np.dot(((1.0 / (1.0 + np.exp(-Y * np.dot(X, w))) - 1.0) * Y), X)
  5. return w

我们不会讨论算法的细节,而是关注该程序如何使用自动并行化:

  1. 输入Y是大小为N的向量,XN x D矩阵,w是大小为D的向量。
  2. 函数体是一个迭代循环,它更新变量w。循环体由一系列向量和矩阵运算组成。
  3. 内部dot操作产生一个大小为N的向量,然后是标量和大小为N的向量之间的一系列算术运算,或两个大小为N的向量。
  4. 外部dot产生一个大小为D的向量,然后在变量w上进行就地数组减法。
  5. 通过自动并行化,将生成大小为N的数组的所有操作融合在一起,成为单个并行内核。这包括内部dot操作和后面的所有逐点数组操作。
  6. 外部dot操作产生不同维度的结果数组,并且不与上述内核融合。

这里,利用并行硬件唯一需要的是为 jit() 设置并行选项,而不对logistic_regression功能本身进行修改。如果我们使用 guvectorize() 给出等价并行实现,则需要进行普遍的更改,重写代码以提取可并行化的内核计算,这既繁琐又具有挑战性。

1.10.4。诊断

注意

目前,并非所有并行变换和功能都可以通过代码生成过程进行跟踪。偶尔可能会丢失有关某些循环或变换的诊断信息。

jit()并行选项可以生成有关自动并行化修饰代码的变换的诊断信息。这个信息可以通过两种方式访问​​,第一种是通过设置环境变量 NUMBA_PARALLEL_DIAGNOSTICS ,第二种是通过调用 parallel_diagnostics() ,两种方法都给出相同的信息并打印至STDOUT。诊断信息中的详细程度由 1 到 4 之间的整数参数控制,其中 1 表示最小,4 表示最多。例如:

  1. @njit(parallel=True)
  2. def test(x):
  3. n = x.shape[0]
  4. a = np.sin(x)
  5. b = np.cos(a * a)
  6. acc = 0
  7. for i in prange(n - 2):
  8. for j in prange(n - 1):
  9. acc += b[i] + b[j + 1]
  10. return acc
  11. test(np.arange(10))
  12. test.parallel_diagnostics(level=4)

生产:

  1. ================================================================================
  2. ======= Parallel Accelerator Optimizing: Function test, example.py (4) =======
  3. ================================================================================
  4. Parallel loop listing for Function test, example.py (4)
  5. --------------------------------------|loop #ID
  6. @njit(parallel=True) |
  7. def test(x): |
  8. n = x.shape[0] |
  9. a = np.sin(x)---------------------| #0
  10. b = np.cos(a * a)-----------------| #1
  11. acc = 0 |
  12. for i in prange(n - 2):-----------| #3
  13. for j in prange(n - 1):-------| #2
  14. acc += b[i] + b[j + 1] |
  15. return acc |
  16. --------------------------------- Fusing loops ---------------------------------
  17. Attempting fusion of parallel loops (combines loops with similar properties)...
  18. Trying to fuse loops #0 and #1:
  19. - fusion succeeded: parallel for-loop #1 is fused into for-loop #0.
  20. Trying to fuse loops #0 and #3:
  21. - fusion failed: loop dimension mismatched in axis 0\. slice(0, x_size0.1, 1)
  22. != slice(0, $40.4, 1)
  23. ----------------------------- Before Optimization ------------------------------
  24. Parallel region 0:
  25. +--0 (parallel)
  26. +--1 (parallel)
  27. Parallel region 1:
  28. +--3 (parallel)
  29. +--2 (parallel)
  30. --------------------------------------------------------------------------------
  31. ------------------------------ After Optimization ------------------------------
  32. Parallel region 0:
  33. +--0 (parallel, fused with loop(s): 1)
  34. Parallel region 1:
  35. +--3 (parallel)
  36. +--2 (serial)
  37. Parallel region 0 (loop #0) had 1 loop(s) fused.
  38. Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part
  39. of the larger parallel loop (#3).
  40. --------------------------------------------------------------------------------
  41. --------------------------------------------------------------------------------
  42. ---------------------------Loop invariant code motion---------------------------
  43. Instruction hoisting:
  44. loop #0:
  45. Failed to hoist the following:
  46. dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99)
  47. dependency: $0.6.11 = getattr(value=$0.5, attr=sin)
  48. dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None)
  49. dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
  50. dependency: $0.10.20 = getattr(value=$0.9, attr=cos)
  51. dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None)
  52. loop #3:
  53. Has the following hoisted:
  54. $const58.3 = const(int, 1)
  55. $58.4 = _n_23 - $const58.3
  56. --------------------------------------------------------------------------------

为了帮助用户不熟悉使用并行选项时进行的转换,并帮助理解后续章节,提供了以下定义:

    1. Loop fusion

    循环融合是一种技术,其中具有等效边界的循环可以在某些条件下组合以产生具有较大主体的循环(旨在改善数据局部性)。

    1. Loop serialization

    当在另一个prange驱动的回路内存在任意数量的prange驱动回路时,会发生回路串行化。在这种情况下,所有prange循环的最外层并行执行,并且任何内部prange循环(嵌套或其他)被视为基于标准range的循环。实质上,嵌套并行性不会发生。

    1. Loop invariant code motion

    循环不变代码运动是一种优化技术,它分析循环以查找可以移动到循环体外的语句而不改变执行循环的结果,然后这些语句被“提升”出循环保存重复计算。

    1. Allocation hoisting

    分配提升是循环不变代码运动的一种特殊情况,由于一些常见的 NumPy 分配方法的设计,这是可能的。这个技术的解释最好由一个例子驱动:

    1. @njit(parallel=True)
    2. def test(n):
    3. for i in prange(n):
    4. temp = np.zeros((50, 50)) # &lt;--- Allocate a temporary array with np.zeros()
    5. for j in range(50):
    6. temp[j, j] = i
    7. # ...do something with temp

    在内部,这被转换为大致如下:

    1. @njit(parallel=True)
    2. def test(n):
    3. for i in prange(n):
    4. temp = np.empty((50, 50)) # &lt;--- np.zeros() is rewritten as np.empty()
    5. temp[:] = 0 # &lt;--- and then a zero initialisation
    6. for j in range(50):
    7. temp[j, j] = i
    8. # ...do something with temp

    然后吊装后:

    1. @njit(parallel=True)
    2. def test(n):
    3. temp = np.empty((50, 50)) # &lt;--- allocation is hoisted as a loop invariant as `np.empty` is considered pure
    4. for i in prange(n):
    5. temp[:] = 0 # &lt;--- this remains as assignment is a side effect
    6. for j in range(50):
    7. temp[j, j] = i
    8. # ...do something with temp

    可以看出np.zeros分配被分成一个分配和一个赋值,然后分配从i中的循环中提升,这产生了更高效的代码,因为分配只发生一次。

1.10.4.1。并行诊断报告部分

该报告分为以下几个部分:

    1. Code annotation

    这是第一部分,包含带有循环的源代码,循环具有标识和枚举的并行语义。源代码右侧的loop #ID列与已识别的并行循环对齐。从示例中,#0np.sin#1np.cos#2#3prange()

    1. Parallel loop listing for Function test, example.py (4)
    2. --------------------------------------|loop #ID
    3. @njit(parallel=True) |
    4. def test(x): |
    5. n = x.shape[0] |
    6. a = np.sin(x)---------------------| #0
    7. b = np.cos(a * a)-----------------| #1
    8. acc = 0 |
    9. for i in prange(n - 2):-----------| #3
    10. for j in prange(n - 1):-------| #2
    11. acc += b[i] + b[j + 1] |
    12. return acc |

    值得注意的是,循环 ID 按它们被发现的顺序枚举,这不一定与源中存在的顺序相同。此外,还应注意,并行变换使用静态计数器进行循环 ID 索引。因此,由于使用相同的计数器进行对用户不可见的内部优化/变换,循环 ID 索引可能不会从 0 开始。

    1. Fusing loops

    本节介绍在融合发现的循环时所做的尝试,注意哪些成功哪些失败。在未融合的情况下,给出了一个原因(例如,依赖于其他数据)。从示例:

    1. --------------------------------- Fusing loops ---------------------------------
    2. Attempting fusion of parallel loops (combines loops with similar properties)...
    3. Trying to fuse loops #0 and #1:
    4. - fusion succeeded: parallel for-loop #1 is fused into for-loop #0.
    5. Trying to fuse loops #0 and #3:
    6. - fusion failed: loop dimension mismatched in axis 0\. slice(0, x_size0.1, 1)
    7. != slice(0, $40.4, 1)

    可以看出,环#0#1的融合被尝试并且这成功(两者都基于x的相同尺寸)。在#0#1成功融合后,尝试在#0(现在包括融合的#1环)和#3之间进行融合。这种融合失败是因为存在环尺寸不匹配,#0是尺寸x.shape#3是尺寸x.shape[0] - 2

    1. Before Optimization

    本节显示了在进行任何优化之前代码中并行区域的结构,但是具有与其最终并行区域相关联的循环(这是在优化输出之前/之后直接进行比较)。如果存在不能融合的循环,则可能存在多个并行区域,在这种情况下,每个区域内的代码将并行执行,但每个并行区域将顺序运行。从示例:

    1. Parallel region 0:
    2. +--0 (parallel)
    3. +--1 (parallel)
    4. Parallel region 1:
    5. +--3 (parallel)
    6. +--2 (parallel)

    正如<cite>融合循环</cite>部分所提到的,代码中必然存在两个并行区域。第一个包含循环#0#1,第二个包含#3#2,所有循环都标记为parallel,因为尚未进行优化。

    1. After Optimization

    本节显示优化发生后代码中并行区域的结构。同样,平行区域用它们相应的循环枚举,但是记录融合或序列化的这个时间循环并给出摘要。从示例:

    1. Parallel region 0:
    2. +--0 (parallel, fused with loop(s): 1)
    3. Parallel region 1:
    4. +--3 (parallel)
    5. +--2 (serial)
    6. Parallel region 0 (loop #0) had 1 loop(s) fused.
    7. Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part
    8. of the larger parallel loop (#3).

    可以注意到,并行区域 0 包含循环#0,并且如<cite>定影循环</cite>部分所示,循环#1融合到循环#0中。还可以注意到,并行区域 1 包含循环#3并且该循环#2(内部prange())已被序列化以在循环体#3中执行。

    1. Loop invariant code motion

    此部分显示优化发生后的每个循环:

    • 未能提升的指示和失败的原因(依赖/不纯)。
    • 悬挂的指示。
    • 任何可能发生的分配吊装。

    从示例:

    1. Instruction hoisting:
    2. loop #0:
    3. Failed to hoist the following:
    4. dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99)
    5. dependency: $0.6.11 = getattr(value=$0.5, attr=sin)
    6. dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None)
    7. dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
    8. dependency: $0.10.20 = getattr(value=$0.9, attr=cos)
    9. dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None)
    10. loop #3:
    11. Has the following hoisted:
    12. $const58.3 = const(int, 1)
    13. $58.4 = _n_23 - $const58.3

    首先要注意的是,此信息适用于高级用户,因为它指的是正在转换的函数的 Numba IR 。例如,示例源中的表达式a * a部分转换为 IR 中的表达式$arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9,这显然无法从loop #0中提升,因为它不是循环不变的!而在loop #3中,表达式$const58.3 = const(int, 1)来自源b[j + 1],数字1显然是一个常数,因此可以从循环中提升。

也可以看看

并行并行常见问题解答