DR:第一:
prange
相同的
range
,除非将parallel添加到
jit
,例如
njit(parallel=True)
. 如果您尝试这样做,您将看到一个关于“不支持的减少”的异常——这是因为numba限制了
普朗格
到
“纯”回路
和
“不纯循环”与numba支持的减少
并负责确保它属于用户的这两个类别中的任何一个。
这在以下文件中有明确说明:
numbas
prange
(version 0.42)
:
1.1.2显式并行循环
此代码转换过程的另一个特点是支持显式并行循环。可以使用numba_
普朗格
而不是
范围
指定循环可以并行化。用户需要确保循环没有交叉迭代依赖项,除了支持的缩减。
这些注释所指的“不纯”在该文档中被称为“跨迭代依赖性”。这种“交叉迭代依赖性”是一个在循环之间变化的变量。一个简单的例子是:
def func(n):
a = 0
for i in range(n):
a += 1
return a
这里是变量
a
取决于循环开始前的值
和
执行了多少次循环迭代。这就是“跨迭代依赖”或“不纯”循环的含义。
当显式并行化这样一个循环时,问题是迭代是并行执行的,但是每个迭代都需要知道其他迭代在做什么。不这样做会导致错误的结果。
我们先假设一下
普朗格
会产生4个工人,我们通过
4
作为
n
到函数。一个完全幼稚的实现会做什么?
Worker 1 starts, gets a i = 1 from `prange`, and reads a = 0
Worker 2 starts, gets a i = 2 from `prange`, and reads a = 0
Worker 3 starts, gets a i = 3 from `prange`, and reads a = 0
Worker 1 executed the loop and sets `a = a + 1` (=> 1)
Worker 3 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 starts, gets a i = 4 from `prange`, and reads a = 2
Worker 2 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 executed the loop and sets `a = a + 1` (=> 3)
=> Loop ended, function return 3
不同的工人读、执行和写的顺序
一
可以任意,这只是一个例子。它也可以(偶然)产生正确的结果!这通常被称为
Race condition
.
有什么更复杂的
普朗格
这是否承认存在这样一个跨迭代依赖关系?
有三种选择:
-
只是不要把它与之平行。
-
实现工人共享变量的机制。这里的典型例子有
Locks
(这可能会产生很高的开销)。
-
认识到这是一个可以并行化的约简。
鉴于我对numba文档的理解(再次重复):
用户需要确保循环没有交叉迭代依赖项,除了支持的缩减。
努姆巴:
-
如果是已知的约简,则使用模式将其并行化
-
如果不是已知的约简,则抛出异常
不幸的是,目前还不清楚“支持的削减”是什么。但文档提示,对循环体中的前一个值进行操作的是二进制运算符:
如果变量由二元函数/运算符使用循环体中的前一个值更新,则会自动推断出约简。还原的初始值自动推断为
+=
和
*=
运算符。对于其他函数/运算符,在输入
普朗格
循环。对于标量和任意维数组,支持这种方式的减少。
OP中的代码使用列表作为跨迭代依赖项和调用
list.append
在循环体中。我个人不会打电话
列表追加
一个约简,它不使用二元运算符,所以我假设它很可能
不支持
. 对于其他跨迭代依赖
running
:它正在对上一次迭代的结果使用加法(这很好),但如果它超过了阈值(这可能不好),也会有条件地将其重置为零。
numba提供了检查中间代码(llvm和asm)代码的方法:
dynamic_cumsum.inspect_types()
dynamic_cumsum.inspect_llvm()
dynamic_cumsum.inspect_asm()
但是,即使我已经对结果有了必要的理解,才能对所发出代码的正确性做出任何声明——一般来说,“证明”多线程/进程代码工作正常是非常重要的。考虑到我甚至缺乏LLVM和ASM知识,甚至看它是否试图将其并行化,我实际上无法回答您的特定问题,它会产生什么结果。
回到代码,如前所述,如果使用
parallel=True
因此,我假设numba不与示例中的任何内容并行:
from numba import njit, prange
@njit(parallel=True)
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
dynamic_cumsum(np.ones(100), np.arange(100), 10)
AssertionError: Invalid reduction format
During handling of the above exception, another exception occurred:
LoweringError: Failed in nopython mode pipeline (step: nopython mode backend)
Invalid reduction format
File "<>", line 7:
def dynamic_cumsum(seq, index, max_value):
<source elided>
running = 0
for i in prange(len(seq)):
^
[1] During: lowering "id=2[LoopNest(index_variable = parfor_index.192, range = (0, seq_size0.189, 1))]{56: <ir.Block at <> (10)>, 24: <ir.Block at <> (7)>, 34: <ir.Block at <> (8)>}Var(parfor_index.192, <> (7))" at <> (7)
接下来要说的是:
普朗格
不提供任何速度优势
在这种情况下
超过正常
范围
(因为它不是并行执行的)。因此,在这种情况下,我不会“冒险”潜在的问题和/或混淆读者,因为根据numba文档,它是不受支持的。
from numba import njit, prange
@njit
def p_dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
@njit
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in range(len(seq)): # <-- here is the only change
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
只是一个快速的时间安排,支持我之前的“不快于”声明:
import numpy as np
seq = np.random.randint(0, 100, 10_000_000)
index = np.arange(10_000_000)
max_ = 500
# Correctness and warm-up
assert p_dynamic_cumsum(seq, index, max_) == dynamic_cumsum(seq, index, max_)
%timeit p_dynamic_cumsum(seq, index, max_)
# 468 ms ± 12.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit dynamic_cumsum(seq, index, max_)
# 470 ms ± 9.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)