首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么此TCP服务器会立即退出?

为什么此TCP服务器会立即退出?
EN

Stack Overflow用户
提问于 2020-07-16 04:12:45
回答 1查看 102关注 0票数 3

这是我从一个更大的项目中提取的,似乎没有服务器立即返回的问题(我承认我提取的原因首先是希望问一个关于accept失败的不同问题,所以代码可能有其他问题)。

我不认为运行更少的线程(好吧,一个线程)会是一个问题,但是TCP.serve似乎默默地返回:

代码语言:javascript
复制
starting tcp server
exgetting protobuf port
iting serveTBQ
tcp server exited

预期的行为是,它将继续运行,侦听指定的端口(getPort)。

以下是自包含的示例代码:

代码语言:javascript
复制
#!/usr/bin/env stack
{- stack script --nix --resolver lts-14.27
  --nix-packages zlib
  --no-nix-pure
  --package bytestring
  --package classy-prelude
  --package conduit
  --package exceptions
  --package mtl
  --package network
  --package network-simple
  --package stm
  --package stm-conduit
  --package text
  --package unliftio
  --ghc-options -Wall
-}

-- Use --verbose above for better error messages for library build failures
--  --package refined
--   --extra-dep unexceptionalio-0.5.1

{-# LANGUAGE NoImplicitPrelude #-}

{-# LANGUAGE RankNTypes                  #-}
{-# LANGUAGE ScopedTypeVariables         #-}

module Main where

import           ClassyPrelude                    hiding (hClose)
import           Conduit
import           Control.Concurrent.STM.TBQueue   (TBQueue, writeTBQueue)
import           Control.Monad.Catch              (MonadMask)
import           Control.Monad.Writer
import           Data.Bits                        (shiftR, (.&.))
import qualified Data.ByteString.Char8            as B
import           Data.Conduit.Async               (gatherFrom)
import qualified Data.Conduit.List                as CL
import           Data.Function                    ((&))
import qualified Data.Text                        as T
import           GHC.IO.Handle                    (Handle, hClose)
import qualified Network.Simple.TCP               as TCP
import qualified Network.Socket                   as NS
import           UnliftIO.Concurrent              (ThreadId, forkIO, threadDelay)

type Error = [String]
type Result r = Writer Error r

runResult :: Result r -> (r, Error)
runResult = runWriter

getPort :: NS.ServiceName
getPort = "29876"

-- | This signature is meant to simulate the same function from the proto-lens library,
-- | but without dealing with protobus for binary data.
decodeMessageDelimitedH :: Handle -> IO (Either String String)
decodeMessageDelimitedH h = do
    sOut <- B.hGetLine h
    pure $ Right $ B.unpack sOut

protoServe :: forall m. (MonadMask m, MonadResource m, MonadUnliftIO m) =>
     (String -> Result [String])
  -> ConduitT () [String] m ()
protoServe fromProto = start .| mapMC logFilterRead
  .| CL.catMaybes .| mapMC msgToRecs
  where
    port = trace "getting protobuf port" getPort
    start = do
      let enQserver = serveTBQ (TCP.HostIPv4) port (decodeProto . fst)
      gatherFrom 10000 enQserver
    decodeProto :: NS.Socket -> m (Either String String)
    decodeProto sock = bracket
      connHandleIO
      (liftIO . hClose)
      (liftIO . decodeMessageDelimitedH)
      where
        connHandleIO :: m Handle
        connHandleIO = liftIO $ sockToHandle sock
    logFilterRead :: Either String String -> m (Maybe String)
    logFilterRead pEi = case pEi of
      Right p -> pure $ Just p
      Left err -> trace err $ pure Nothing
    msgToRecs :: String -> m [String]
    msgToRecs p = case runResult $ fromProto p of
      (rs, rErr) -> do
        when (not $ null rErr) $ pure $ trace (intercalate "\n" rErr) ()
        pure $ trace "completed msgToRecs" rs

-- | The handle only needs a read-view of the socket.  Note that a TBQeueue is
-- | mutable but has STM's runtime safety checks in place.
sockToHandle :: NS.Socket -> IO Handle
sockToHandle sock = NS.socketToHandle sock ReadMode

-- | Based on serve and listen from Network.Simple.TCP
-- | Unlike `serve`, which never returns, `serveTBQ` immediately returns
-- | a `TBQueue` of results.
serveTBQ :: forall a m. (MonadMask m, MonadUnliftIO m)
  => TCP.HostPreference -- ^ Host to bind.
  -> NS.ServiceName -- ^ Server service port name or number to bind.
  -> ((NS.Socket, NS.SockAddr) -> m a)
  -- ^ Computation to run in a different thread once an incoming connection is
  -- accepted. Takes the connection socket and remote end address.
  -> TBQueue a -- ^ enqueue computation results to this queue
  -> m ()
  -- ^ Returns a FIFO (queue) of results from concurrent requests
serveTBQ hp port rFun tbq = do
    _ <- async $ withRunInIO $ \run -> TCP.serve hp port $ \(lsock, _) -> do
      run $ void $ acceptTBQ lsock rFun tbq
    putStrLn $ T.pack "exiting serveTBQ"

-- | Based on acceptFork from Network.Simple.TCP.
acceptTBQ :: forall a m.
  MonadUnliftIO m
  => NS.Socket -- ^ Listening and bound socket.
  -> ((NS.Socket, NS.SockAddr) -> m a)
  -- ^ Computation to run in a different thread once an incoming connection is
  -- accepted. Takes the connection socket and remote end address.
  -> TBQueue a
  -> m ThreadId
acceptTBQ lsock rFun tbq = mask $ \restore -> do
  (csock, addr) <- trace ("running restore-accept on lsock: " <> (show lsock)) $ restore (liftIO $ NS.accept lsock)
  onException (forkIO $ finally
    (restore $ do
      rVal <- trace "retrieved rVal in finally-restore" rFun (csock, addr)
      atomically $ writeTBQueue tbq rVal)
    (TCP.closeSock csock))
    (TCP.closeSock csock)

retryForever :: forall m a. MonadUnliftIO m => m a -> m a
retryForever prog = catchAny prog progRetry
  where
    progRetry :: SomeException -> m a
    progRetry ex = do
      putStrLn $ pack $ show ex
      threadDelay 4000000
      retryForever prog

-- | Safer interface to sinkNull
sinkUnits :: MonadResource m => ConduitT () Void m ()
sinkUnits = sinkNull

main :: IO ()
main = retryForever $ do
  putStrLn $ T.pack "starting tcp server"
  let myProtoServe = protoServe (pure . words)
  myProtoServe .| mapMC (putStrLn . T.pack . intercalate "_") .| sinkUnits & runConduitRes
  putStrLn $ T.pack "tcp server exited"

也许有一种方法可以让它与多线程一起工作,同时仍然保留一个堆栈脚本?请参阅multicore parallelism with stack runghc

EN

回答 1

Stack Overflow用户

发布于 2020-07-17 22:35:14

在本例中,提取的服务器示例终止的原因是因为程序本身最终会退出,这会终止所有其他线程(包括运行服务器的线程),而在我的实际应用程序中,主线程已经有了防止这种情况发生的循环。

因此,只需在主IO操作的末尾添加以下内容即可:

代码语言:javascript
复制
waitForever :: IO ()
waitForever = do
  threadDelay 10000
  waitForever

感谢@ProofOfKeags on slack的提示。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62923078

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档