首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将管道合并为一体

将管道合并为一体
EN

Stack Overflow用户
提问于 2016-02-17 11:07:06
回答 2查看 447关注 0票数 4

我正在寻找一个函数,它可以执行类似于:

代码语言:javascript
复制
merge :: MonadIO m => [Producer m a] -> Producer m a

我快速地看了一下stm-conduit,它看起来很相似,但我不确定它是否符合我的要求:

代码语言:javascript
复制
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

正如您所看到的,这个管道生产者在产生消息后会发出一条消息。在一个简单的“单线程”管道中,它工作得很好,消息会到达接收器,然后被加起来。

但是,对于stm-conduit,这种情况可能会发生变化,因为据我所理解,生产者不会等待消息被接收器消耗,相反,它们将并行工作,并且消息可能会过早地被添加。

我对stm-conduit的理解正确吗?

如何将独立的源合并成一个具有良好的单流语义的方法呢?

UPDATE:根据请求更新到一个实际工作的示例的代码(不过,这可能有点麻烦)。

更新2:我认为我所追求的可能是管道源的另一个实例,因此我可以做一些类似于let src = src1 <|> src2的事情。有可能吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-02-17 13:24:29

mergeSources in stm-conduit在场景后面维护一个TBMChannel。您的所有源/生产者首先连接到TBMChannel,然后它将创建一个源,尝试从通道FIFO中提取值。

您可以在使用TBMChannel时设置中间mergeSources的界限。假设您将绑定设置为n,那么所有源产生的前n个值将立即转储到TBMChannelAmqpConn,前提是它没有在AmqpConn端被阻塞,并且您的使用者比源的速度慢(BTW AmqpConn使用无界Control.Concurrent.Chan,因此不会阻塞)。在此之后,TBMChannel已经满了,因此,试图向通道生成值的源将被阻塞。您的使用者从合并的源中逐个获取值,因此在前n个元素之后是顺序的。

为了确保从一开始它是连续的,您可以将绑定设置为1,但是它可能会导致一些性能问题。

票数 1
EN

Stack Overflow用户

发布于 2016-02-17 12:32:12

看看ZipSource,它是一个新类型的包装器,它的Applicative允许您以您想要的方式组合Source

一旦有了ZipSource,就可以使用zipSourcesTraversable中的Source(例如,列表)组合成Source of Traversables。

所需结果类型的唯一区别是,它是值的Source,而不是单个值,但这不应该是什么问题。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35454935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档