这是一项黑客工作
parVector
但这对我很有用:
import qualified Data.Vector as V
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq
ack :: Int -> Int -> Int
ack 0 n = n+1
ack m 0 = ack (m-1) 1
ack m n = ack (m-1) (ack m (n-1))
main = do
let vec = V.enumFromN 1 1000
let res = (V.map (ack 2) vec) `using` parVector
print res
parVector :: NFData a => Strategy (V.Vector a)
parVector vec = eval vec `seq` Done vec
where
chunkSize = 1
eval v
| vLen == 0 = ()
| vLen <= chunkSize = rnf (v V.! 0) -- FIX this to handle chunks > 1
| otherwise = eval (V.take half v) `par` eval (V.drop half v)
where vLen = V.length v
half = vLen `div` 2
运行此代码:
[tommd@Mavlo Test]$ ghc --make -O2 -threaded t.hs
... dumb warning ...
[tommd@Mavlo Test]$ time ./t +RTS -N1 >/dev/null
real 0m1.962s user 0m1.951s sys 0m0.009s
[tommd@Mavlo Test]$ time ./t +RTS -N2 >/dev/null
real 0m1.119s user 0m2.221s sys 0m0.005s
当我用
Integer
而不是
Int
在类型签名中:
[tommd@Mavlo Test]$ time ./t +RTS -N2 >/dev/null
real 0m4.754s
user 0m9.435s
sys 0m0.028s
[tommd@Mavlo Test]$ time ./t +RTS -N1 >/dev/null
real 0m9.008s
user 0m8.952s
sys 0m0.029s
摇滚!
编辑:一个更接近于以前的尝试的解决方案是更干净(它不使用来自三个独立模块的功能)并且工作得很好:
parVector :: NFData a => Strategy (V.Vector a)
parVector vec =
let vLen = V.length vec
half = vLen `div` 2
minChunk = 10
in if vLen > minChunk
then do
let v1 = V.unsafeSlice 0 half vec
v2 = V.unsafeSlice half (vLen - half) vec
parVector v1
parVector v2
return vec
else
evalChunk (vLen-1) >>
return vec
where
evalChunk 0 = rpar (rdeepseq (vec V.! 0)) >> return vec
evalChunk i = rpar (rdeepseq (vec V.! i)) >> evalChunk (i-1)
从这个解决方案中学到的东西:
-
它使用
Eval
蒙纳德,这是严格的,所以我们肯定会点燃一切(相比于包装东西
let
记住使用爆炸模式)。
-
与您提议的实现相反,(a)不构建新的向量,这是昂贵的(b)
evalChunk
每个元素的力评估
rpar
和
rdeepseq
(我不相信
rpar vec
强制向量的任何元素)。
-
与我的信仰相反,
slice
获取起始索引和长度,而不是起始索引和结束索引。哎呀!
-
我们仍然需要进口
Control.DeepSeq (NFData)
,但我已通过电子邮件发送了库列表以尝试解决该问题。
性能似乎与第一个相似
向量向量
解决方案在这个答案中,所以我不会发布数字。