我正在寻找一个函数,它可以执行类似于:
merge :: MonadIO m => [Producer m a] -> Producer m a我快速地看了一下stm-conduit,它看起来很相似,但我不确定它是否符合我的要求:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn正如您所看到的,这个管道生产者在产生消息后会发出一条消息。在一个简单的“单线程”管道中,它工作得很好,消息会到达接收器,然后被加起来。
但是,对于stm-conduit,这种情况可能会发生变化,因为据我所理解,生产者不会等待消息被接收器消耗,相反,它们将并行工作,并且消息可能会过早地被添加。
我对stm-conduit的理解正确吗?
如何将独立的源合并成一个具有良好的单流语义的方法呢?
UPDATE:根据请求更新到一个实际工作的示例的代码(不过,这可能有点麻烦)。
更新2:我认为我所追求的可能是管道源的另一个实例,因此我可以做一些类似于let src = src1 <|> src2的事情。有可能吗?
发布于 2016-02-17 13:24:29
mergeSources in stm-conduit在场景后面维护一个TBMChannel。您的所有源/生产者首先连接到TBMChannel,然后它将创建一个源,尝试从通道FIFO中提取值。
您可以在使用TBMChannel时设置中间mergeSources的界限。假设您将绑定设置为n,那么所有源产生的前n个值将立即转储到TBMChannel和AmqpConn,前提是它没有在AmqpConn端被阻塞,并且您的使用者比源的速度慢(BTW AmqpConn使用无界Control.Concurrent.Chan,因此不会阻塞)。在此之后,TBMChannel已经满了,因此,试图向通道生成值的源将被阻塞。您的使用者从合并的源中逐个获取值,因此在前n个元素之后是顺序的。
为了确保从一开始它是连续的,您可以将绑定设置为1,但是它可能会导致一些性能问题。
发布于 2016-02-17 12:32:12
看看ZipSource,它是一个新类型的包装器,它的Applicative允许您以您想要的方式组合Source。
一旦有了ZipSource,就可以使用zipSources将Traversable中的Source(例如,列表)组合成Source of Traversables。
所需结果类型的唯一区别是,它是值的Source,而不是单个值,但这不应该是什么问题。
https://stackoverflow.com/questions/35454935
复制相似问题