首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多输入熔断管道

多输入熔断管道
EN

Stack Overflow用户
提问于 2013-03-24 02:37:15
回答 1查看 1.1K关注 0票数 9

我正在尝试创建一个可以消耗多个输入流的管道。我需要能够等待一个或另一个输入流以不特定的顺序(例如,不交替)使压缩无用。这里没有任何平行或不确定的事情发生:我在一条或另一条河流上等待。我希望能够编写类似于以下内容的代码(其中awaitAawaitB分别等待第一个或第二个输入流):

代码语言:javascript
复制
do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

我所拥有的最好的解决方案是把内部的单元组变成另一条管道。

代码语言:javascript
复制
foo :: Sink i1 (ConduitM i2 o m) ()

然后允许

代码语言:javascript
复制
awaitA = await
awaitB = lift await

这主要是有用的。不幸的是,这似乎使得在外部管道完全连接之前很难与内部管道融合。我尝试的第一件事是:

代码语言:javascript
复制
fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

但是这不起作用,至少当x是有状态的,因为(x =$=)多次运行时,每次都有效地重新启动x

除了闯入管道内部(看起来会很混乱),有没有办法编写fuseInner呢?是否有更好的方法来处理多个输入流?难道我只是想超越管道设计的范围吗?

谢谢!

EN

回答 1

Stack Overflow用户

发布于 2013-03-24 09:45:09

如果您想要组合两个IO-generated流,那么加布里埃尔的评论就是解决方案。

否则,您不能等待这两个流,哪个流首先生成一个值。管道是单线程和确定性的-它一次只处理一个管道。但是您可以创建一个将两个流交织在一起的函数,让它们决定何时切换:

代码语言:javascript
复制
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
    Pipe (..), Source, Sink,
    injectLeftovers, ConduitM (..),
    mapOutput, mapOutputMaybe
  )

-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
      => Source m (Maybe a)
      -> Source m (Maybe b)
      -> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
  where
    goL :: Monad m => Pipe () () (Maybe a) () m () 
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goL (Leftover l ()) r           = goL l r
    goL (NeedInput _ c) r           = goL (c ()) r
    goL (PipeM mx) r                = PipeM $ liftM (`goL` r) mx
    goL (Done _) r                  = mapOutputMaybe (liftM Right) r
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
    goL (HaveOutput c f Nothing) r  = goR c r
    -- This is just a mirror copy of goL. We should combine them together to
    -- avoid code repetition.
    goR :: Monad m => Pipe () () (Maybe a) () m ()
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goR l (Leftover r ())           = goR l r
    goR l (NeedInput _ c)           = goR l (c ())
    goR l (PipeM mx)                = PipeM $ liftM (goR l) mx
    goR l (Done _)                  = mapOutputMaybe (liftM Left) l
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
    goR l (HaveOutput c f Nothing)  = goL l c

它处理一个源,直到它返回Nothing,然后切换到另一个源,等等。如果一个源完成,另一个源被处理到最后。

例如,我们可以组合和交织两个列表:

代码语言:javascript
复制
import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)

main =  (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [  1..10])
               (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
         $$ awaitForever (\x -> lift $ print x)

如果您需要多个源,则可以将merge调整为类似的

代码语言:javascript
复制
mergeList :: Monad m => [Source m (Maybe a)] -> Source m a

这将循环通过给定的资源列表,直到它们全部完成。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/15594556

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档