我真的很喜欢将操作应用到流IO源的管道/管道的概念。我对构建处理非常大的日志文件的工具很感兴趣。从Python/Ruby迁移到Haskell的一个吸引人的地方是更容易编写并行代码,但我找不到任何有关这方面的文档。我如何设置一个从文件中读取行并并行处理它们的管道流(即。对于8个核心,它应该读取8行代码,并将它们传递给8个不同的线程进行处理,然后重新收集(等等),理想情况下尽可能少地“仪式”……
可选的是,是否需要按顺序重新连接这些行,如果这会影响过程的速度?
我确信可以使用Parallel Haskell一书中的想法自己拼凑出一些东西,但在我看来,在管道工作流中间并行运行一个纯函数(parmap等)应该是非常容易的?
发布于 2014-11-05 15:55:22
作为Petr Pudlák在他的评论中提到的“内部并行性”的一个例子,考虑这个函数(我使用的是pipes,但也可以用conduit实现):
import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L
concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer =
L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
>->
forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 此函数将组大小、我们希望为a类型的每个值运行的操作以及a值的Producer作为参数。
它返回一个新的Producer。在内部,生产者以批处理groupsize的方式读取a值,并发地处理它们,并逐个产生结果。
代码使用Pipes.Group将原始生产者“划分”成大小为groupsize的子生产者,然后使用Control.Foldl将每个子生产者“折叠”到一个列表中。
对于更复杂的任务,您可以使用pipes-concurrency或stm-conduit提供的异步通道。但是这些让你从“单一管道”的普通管道/导管的世界观中抽身出来。
https://stackoverflow.com/questions/26742276
复制相似问题