首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用postgresql-simple创建流管道源

使用postgresql-simple创建流管道源
EN

Stack Overflow用户
提问于 2017-01-13 21:10:39
回答 2查看 449关注 0票数 4

postgresql-simple提供流查询功能,例如:

代码语言:javascript
复制
fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a

我想要创建一个充分利用流的管道源。

代码语言:javascript
复制
mySource :: (FromRow row, Monad m) => Source m row

不幸的是,因为IO出现在一个相反的位置(我认为?)在fold中,我真的很纠结于这些类型。以下类型检查,但在生成值之前折叠整个流。

代码语言:javascript
复制
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt = pure (cond >> C.yield evt)

任何关于如何实现这一点的提示都将不胜感激!谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-01-13 22:10:21

有一种(不太好)的方法

  • 创建一个新的TMChan来接收行
  • foreach_设置为只将行转储到此通道
  • 最后,使用stm-conduit从通道中生成一个源

我没有办法直接测试这一点,但下面的方法应该有效

代码语言:javascript
复制
import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)

mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
  chan <- newTMChanIO
  forEach_ connection query (atomically . writeTMChan chan)
  pure (sourceTMChan chan)

如果我们有forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()就好了.

票数 5
EN

Stack Overflow用户

发布于 2017-06-04 02:12:46

下面是对Alec上面的一个修改,它编译并运行。mkPgSource是亚历克在其职务结束时所提到的一般职能。

代码语言:javascript
复制
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToRow
import Control.Monad.IO.Class (MonadIO)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, 
closeTMChan, TMChan)
import GHC.Conc (atomically, forkIO)
import Conduit

--closes the channel after action is done to terminate the source
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO ()) -> IO ()) -> IO (Source m r)
mkPgSource action = do
  chan <- newTMChanIO
  _ <- forkIO $ do action $ atomically . (writeTMChan chan)
               atomically $ closeTMChan chan
  pure $ sourceTMChan chan

sourceQuery :: (ToRow params, FromRow r, MonadIO m) =>
     Connection -> Query -> params -> IO (Source m r)
sourceQuery conn q params = mkPgSource $ forEach conn q params

sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO 
(Source m r)
sourceQuery_ conn q = mkPgSource $ forEach_ conn q
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41643485

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档