我有一个简单的例程,它使用Double向量的乘积。我正在尝试并行化这段代码,但是很多火花最终都失败了。下面是一个自成体系的基准测试,它也提供了作为要旨。
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# OPTIONS_GHC -O2 -Wall -threaded -fforce-recomp #-}
import Criterion.Main
import Control.Monad (when)
import Control.Parallel.Strategies (runEval,rpar,rseq)
import qualified Data.Vector.Primitive as PV
main :: IO ()
main = do
let expected = PV.product numbers
when (not (serialProduct numbers == expected)) $ do
fail "serialProduct implementation incorrect"
defaultMain
[ bgroup "product"
[ bench "serial" $ whnf serialProduct numbers
, bench "parallel" $ whnf parallelProduct numbers
]
]
numbers :: PV.Vector Double
numbers = PV.replicate 10000000 1.00000001
{-# NOINLINE numbers #-}
serialProduct :: PV.Vector Double -> Double
serialProduct v =
let !len = PV.length v
go :: Double -> Int -> Double
go !d !ix = if ix < len then go (d * PV.unsafeIndex v ix) (ix + 1) else d
in go 1.0 0
-- | This only works when the vector length is a multiple of 8.
parallelProduct :: PV.Vector Double -> Double
parallelProduct v = runEval $ do
let chunk = div (PV.length v) 8
p2 <- rpar (serialProduct (PV.slice (chunk * 6) chunk v))
p3 <- rpar (serialProduct (PV.slice (chunk * 7) chunk v))
p1 <- rseq (serialProduct (PV.slice (chunk * 0) (chunk * 6) v))
return (p1 * p2 * p3)这可以通过以下方式构建和运行:
ghc -threaded parallel_compute.hs
./parallel_compute +RTS -N4 -s我有一个八核盒,所以给运行时提供四个功能应该可以。基准结果并不是非常重要,但如下所示:
benchmarking product/serial
time 11.40 ms (11.30 ms .. 11.53 ms)
0.999 R² (0.998 R² .. 1.000 R²)
mean 11.43 ms (11.37 ms .. 11.50 ms)
std dev 167.2 μs (120.4 μs .. 210.1 μs)
benchmarking product/parallel
time 10.03 ms (9.949 ms .. 10.15 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 10.17 ms (10.11 ms .. 10.31 ms)
std dev 235.7 μs (133.4 μs .. 426.2 μs)现在,运行时统计数据。这就是我困惑的地方:
124,508,840 bytes allocated in the heap
529,843,176 bytes copied during GC
80,232,008 bytes maximum residency (8344 sample(s))
901,272 bytes maximum slop
83 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 19 colls, 19 par 0.008s 0.001s 0.0001s 0.0003s
Gen 1 8344 colls, 8343 par 2.916s 1.388s 0.0002s 0.0008s
Parallel GC work balance: 76.45% (serial 0%, perfect 100%)
TASKS: 13 (1 bound, 12 peak workers (12 total), using -N4)
SPARKS: 1024 (502 converted, 0 overflowed, 0 dud, 28 GC'd, 494 fizzled)
INIT time 0.000s ( 0.002s elapsed)
MUT time 11.480s ( 10.414s elapsed)
GC time 2.924s ( 1.389s elapsed)
EXIT time 0.004s ( 0.005s elapsed)
Total time 14.408s ( 11.811s elapsed)
Alloc rate 10,845,717 bytes per MUT second
Productivity 79.7% of total user, 88.2% of total elapsed在关于火花的章节中,我们可以看到大约有一半的火花失败了。这在我看来是难以置信的。在parallelProduct中,我们的主线程工作的任务比任何一个火花都大6倍。然而,似乎这些火花之一总是会失效(或GCed)。这也不是一份小工作。我们讨论的是一种耗时数毫秒的计算,因此主线程在其他线程被触发之前完成它似乎是不可能的。
我的理解(这可能是完全错误的)是,这种计算应该是理想的并发运行时。垃圾收集似乎是GHC中并发应用程序的最大问题,但是我在这里所做的任务并不会产生几乎任何垃圾,因为GHC将serialProduct的内部变成了一个紧密的循环,所有的东西都没有装箱。
好的一面是,我们do在基准测试中看到了并行版本的11%的加速。因此,成功引发的第八部分工作确实产生了可衡量的影响。我只是想知道为什么另一种火花不像我期望的那样起作用。
如能对此有任何了解,将不胜感激。
编辑
我更新了要旨以包括另一个实现:
-- | This only works when the vector length is a multiple of 4.
parallelProductFork :: PV.Vector Double -> Double
parallelProductFork v = unsafePerformIO $ do
let chunk = div (PV.length v) 4
var <- newEmptyMVar
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 0) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 1) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 2) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 3) chunk v)) >>= putMVar var
a <- takeMVar var
b <- takeMVar var
c <- takeMVar var
d <- takeMVar var
return (a * b * c * d)这一台的性能很好:
benchmarking product/parallel mvar
time 3.814 ms (3.669 ms .. 3.946 ms)
0.986 R² (0.977 R² .. 0.992 R²)
mean 3.818 ms (3.708 ms .. 3.964 ms)
std dev 385.6 μs (317.1 μs .. 439.8 μs)
variance introduced by outliers: 64% (severely inflated)但是,它回到了传统的并发原语上,而不是使用sparks。我不喜欢这个解决方案,但我提供它作为证据,它应该有可能达到同样的性能与火花为基础的方法。
https://stackoverflow.com/questions/46586941
复制相似问题