首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么这个Haskell程序在写文件时挂起?

为什么这个Haskell程序在写文件时挂起?
EN

Stack Overflow用户
提问于 2016-08-01 18:40:22
回答 1查看 300关注 0票数 1

如果使用runhaskell运行或编译,下面的程序可以运行,但不适用于-O2。如果用-O2编译,它似乎会挂起。

我在用GHC 7.10.2。

我已经将最小/最大迭代分别更改为10和20。它将在文件test.out中生成20到100 MB的输出。运行时间约为15-60秒。

程序解释

下面是一个多线程程序,它有一个员工池和一个管理器。工作人员生成用于绘制佛陀布罗的跟踪,将其放入队列,由管理器定期清空队列并将数据写入磁盘。当生成了一定数量的数据时,程序就停止了。

但是,当程序运行时,管理器线程只进行一次检查,然后它就被卡住了(工作线程仍然在运行)。但是,如果我删除了管理器线程写入文件的部分,那么一切似乎都正常。我只是不明白为什么。

代码语言:javascript
复制
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
  ( forever
  , unless
  )
import Control.Monad.Loops
import System.IO
import System.Random

import qualified Data.Binary as B
import qualified Data.ByteString.Lazy as BS

type Coord = (Double, Double)

type Trace = [Coord]

-- | Represents a rectangle in the complex plane, bounded by a lower left
-- coordinate and an upper right coordinate.
data Plane
  = Plane { ll :: Coord, ur :: Coord }
  deriving (Show)

-- | Adds two coordinates.
(+.) :: Coord -> Coord -> Coord
(r1, i1) +. (r2, i2) = (r1 + r2, i1 + i2)

-- | Multiplies two coordinates.
(*.) :: Coord -> Coord -> Coord
(r1, i1) *. (r2, i2) = (r1*r2 - i1*i2, r1*i2 + r2*i1)

-- | Computes the square of a coordinate.
square :: Coord -> Coord
square (r, i) = (r*r - i*i, 2*r*i)

-- | Distance from origin to a given coordinate.
distFromOrigin :: Coord -> Double
distFromOrigin (r, i) = r*r + i*i

-- | A structure for passing data to the worker threads.
data WorkerData
  = WorkerData { wdMinIt :: Int
               , wdMaxIt :: Int
               , wdTraceQueue :: TQueue Trace
                 -- ^ A queue of traces to be written to disk.
               }

-- | A structure for passing data to the manager thread.
data ManagerData
  = ManagerData { mdOutHandle :: Handle
                   -- ^ Handle to the output file.
                , mdNumTraces :: Integer
                  -- ^ Number of traces to gather.
                , mdTraceQueue :: TQueue Trace
                  -- ^ A queue of traces to be written to disk.
                }

-- | Encodes an entity to binary bytestring.
encode :: B.Binary a => a -> BS.ByteString
encode = B.encode

-- | Writes a lazy bytestring to file.
writeToFile :: Handle -> BS.ByteString -> IO ()
writeToFile = BS.hPut

mkManagerData :: TQueue Trace -> IO ManagerData
mkManagerData t_queue =
  do let out_f = "test.out"
     out_h <- openBinaryFile out_f WriteMode
     let num_t = 1000
     return $ ManagerData { mdOutHandle = out_h
                          , mdNumTraces = num_t
                          , mdTraceQueue = t_queue
                          }

mkWorkerData :: TQueue Trace -> IO WorkerData
mkWorkerData t_queue =
  do let min_it =  10 -- 1000
         max_it =  20 -- 10000
     return $ WorkerData { wdMinIt = min_it
                         , wdMaxIt = max_it
                         , wdTraceQueue = t_queue
                         }

-- | The actions to be performed by the manager thread.
runManager :: ManagerData -> IO ()
runManager m_data =
  do execute 0
     return ()
  where execute count =
          do new_traces <- purgeTQueue $ mdTraceQueue m_data
             let new_count = count + (toInteger $ length new_traces)
             putStrLn $ "Found " ++ (show $ new_count) ++ " traces so far. "
             if length new_traces > 0
             then do putStrLn $ "Writing new traces to file..."
                     _ <- mapM (writeToFile (mdOutHandle m_data))
                               (map encode new_traces)
                     putStr "Done"
             else return ()
             putStrLn ""
             unless (new_count >= mdNumTraces m_data) $
               do threadDelay (1000 * 1000) -- Sleep 1s
                  execute new_count

-- | The actions to be performed by a worker thread.
runWorker :: WorkerData -> IO ()
runWorker w_data =
  forever $
    do c <- randomCoord
       case computeTrace c (wdMinIt w_data) (wdMaxIt w_data) of
         Just t  -> atomically $ writeTQueue (wdTraceQueue w_data) t
         Nothing -> return ()

-- | Reads all values from a given 'TQueue'. If any other thread reads from the
-- same 'TQueue' during the execution of this function, then this function may
-- deadlock.
purgeTQueue :: Show a => TQueue a -> IO [a]
purgeTQueue q =
  whileJust (atomically $ tryReadTQueue q)
            (return . id)

-- | Generates a random coordinate to trace.
randomCoord :: IO Coord
randomCoord =
  do x <- randomRIO (-2.102613, 1.200613)
     y <- randomRIO (-1.237710, 1.239710)
     return (x, y)

-- | Computes a trace, using the classical Mandelbrot function, for a given
-- coordinate and minimum and maximum iteration count. If the length of the
-- trace is less than the minimum iteration count, or exceeds the maximum
-- iteration count, 'Nothing' is returned.
computeTrace
  :: Coord
  -> Int
     -- ^ Minimum iteration count.
  -> Int
     -- ^ Maximum iteration count.
  -> Maybe Trace
computeTrace c0 min_it max_it =
  if isUsefulCoord c0
  then let step c = square c +. c0
           computeIt c it = if it < max_it
                            then computeIt (step c) (it + 1)
                            else it
           computeTr [] = error "computeTr: empty list"
           computeTr (c:cs) = if length cs < max_it
                              then computeTr (step c:(c:cs))
                              else (c:cs)
           num_it = computeIt c0 0
       in if num_it >= min_it && num_it <= max_it
          then Just $ reverse $ computeTr [c0]
          else Nothing
  else Nothing

-- | Checks if a given coordinate is useful by checking if it belongs in the
-- cardioid or period-2 bulb of the Mandelbrot.
isUsefulCoord :: Coord -> Bool
isUsefulCoord (x, y) =
  let t1 = x - 1/4
      p = sqrt (t1*t1 + y*y)
      is_in_cardioid = x < p - 2*p*p + 1/4
      t2 = x + 1
      is_in_bulb = t2*t2 + y*y < 1/16
  in not is_in_cardioid && not is_in_bulb

main :: IO ()
main =
  do t_queue <- newTQueueIO
     m_data <- mkManagerData  t_queue
     w_data <- mkWorkerData t_queue
     let num_workers = 1
     workers <- mapM async (replicate num_workers (runWorker w_data))
     runManager m_data
     _ <- mapM cancel workers
     _ <- mapM waitCatch workers
     putStrLn "Tracing finished"

失败的原因

在回顾了下面的答案之后,我终于意识到为什么它不像预期的那样工作。程序不会挂起,但是管理器线程编码单个跟踪所需的时间是几十秒(编码时消耗几兆字节)!这意味着,即使队列中有数十条痕迹,当我的机器耗尽时,工人们设法在队列被马槽螺纹耗尽之前产生大约250条痕迹--在下一次排气之前要花上很长时间。

因此,我选择什么样的解决方案并不重要,除非经理线程的工作大大减少。为此,我将不得不放弃将每个单独的跟踪转储到文件中的想法,而是在生成之后处理它。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-08-02 05:44:03

问题有两方面:

(1)管理线程在耗尽队列之前不会处理任何跟踪。

(2)工作线程可以非常非常快地将元素添加到队列中。

这导致了经理线程很少获胜的竞赛。这也解释了使用-O2观察到的行为--优化使工作线程更快。

添加一些调试代码显示,工作人员可以每秒向队列添加超过100 K跟踪的项。此外,即使经理只对写出前1000条痕迹感兴趣,但员工并没有就此止步。因此,在某些情况下,经理永远无法退出这个循环:

purgeTQueue q = whileJust (atomically $ tryReadTQueue q) (return . id)

修复代码的最简单方法是让管理器线程使用readTQueue一次读取和处理队列中的一个项目。当队列我们为空时,这也会阻塞管理器线程,从而避免了对管理器线程周期性睡眠的需要。

purgeTQueue更改为:

代码语言:javascript
复制
purgeTQueue = do item <- atomically $ readTQueue (mdTraceQueue m_data)
                 return [item]

并从threadDelay中删除runManager解决了这个问题。

Lib4.hs模块中可用的示例代码,地址:https://github.com/erantapaa/mandel

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38705886

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档