首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >管道流中的并行处理

管道流中的并行处理
EN

Stack Overflow用户
提问于 2014-11-05 02:23:51
回答 1查看 1.2K关注 0票数 6

我真的很喜欢将操作应用到流IO源的管道/管道的概念。我对构建处理非常大的日志文件的工具很感兴趣。从Python/Ruby迁移到Haskell的一个吸引人的地方是更容易编写并行代码,但我找不到任何有关这方面的文档。我如何设置一个从文件中读取行并并行处理它们的管道流(即。对于8个核心,它应该读取8行代码,并将它们传递给8个不同的线程进行处理,然后重新收集(等等),理想情况下尽可能少地“仪式”……

可选的是,是否需要按顺序重新连接这些行,如果这会影响过程的速度?

我确信可以使用Parallel Haskell一书中的想法自己拼凑出一些东西,但在我看来,在管道工作流中间并行运行一个纯函数(parmap等)应该是非常容易的?

EN

回答 1

Stack Overflow用户

发布于 2014-11-05 15:55:22

作为Petr Pudlák在他的评论中提到的“内部并行性”的一个例子,考虑这个函数(我使用的是pipes,但也可以用conduit实现):

代码语言:javascript
复制
import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer = 
      L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
      >->
      forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 

此函数将组大小、我们希望为a类型的每个值运行的操作以及a值的Producer作为参数。

它返回一个新的Producer。在内部,生产者以批处理groupsize的方式读取a值,并发地处理它们,并逐个产生结果。

代码使用Pipes.Group将原始生产者“划分”成大小为groupsize的子生产者,然后使用Control.Foldl将每个子生产者“折叠”到一个列表中。

对于更复杂的任务,您可以使用pipes-concurrencystm-conduit提供的异步通道。但是这些让你从“单一管道”的普通管道/导管的世界观中抽身出来。

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26742276

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档