我正在处理一个syslog日志文件,每一行都是一个单独的syslog条目,并使用Attoparsec解析器解析该条目。所以我正在使用
fileToBS :: IO Handle -> C.Source (ResourceT IO) BS.ByteString
fileToBS handleMaker = source C.$= bsSplitterConduit
where source = CB.sourceIOHandle handleMaker
bsSplitterConduit = CB.lines以生成系统日志条目流。我正在使用
parseToLogData:: C.Conduit BS.ByteString (ResourceT IO) (Either CATT.ParseError (CATT.PositionRange, LogData))
parseToLogData = CATT.conduitParserEither syslogParser将这些字节串转换为syslog值。Syslog值是从这个解析器生成的(与我自己的一些类型同义词):
syslogParser :: Parser (Priority, Maybe UTCTime, IPAddress, BS.ByteString)
syslogParser = do
pri <- priority <?> "priority parse error"
mbDate <- date <?> "date parse error"
space
srcAddr <- ip
space
msg <- ATT.takeByteString
return LogData{pri = pri, timestamp = mbDate, source = srcAddr, message = "msg"}
priority :: Parser Priority
priority = do
string "<"
digitsString <- takeWhile1 digit
string ">"
return (RawPriority digitsString)
date :: Parser (Maybe UTCTime)
date = do
rawDate <- ATT.take 15
let stringDate = BS.unpack rawDate
let parsedDate = parseTime defaultTimeLocale syslogDateFormat stringDate
return parsedDate
ip :: Parser IPAddress
ip = do
oct0 <- takeWhile1 digit
period
oct1 <- takeWhile1 digit
period
oct2 <- takeWhile1 digit
period
oct3 <- takeWhile1 digit
return (oct0, oct1, oct2, oct3)
--ip = takeWhile1 (\x -> digit x || x == 46)
space = string " "
colon = string ":"
period = string "."
digit test = (test >= 48 && test <= 57)
octet = digit问题出在占用系统日志条目(msg <- ATT.takeByteString)的所有剩余部分的那一行。这个函数不能很好地处理流,因为如果使用可恢复解析器(这是conduit的attoparsec库使用的解析器),它需要一个终止信号。
我曾尝试生成空字节串来修复此行为,但它不能按预期工作(请参阅https://hackage.haskell.org/package/attoparsec-0.12.1.2/docs/Data-Attoparsec-ByteString.html上的增量输入)。它将整个syslog输入文件转换为一个已解析的值。这是一个80MB的测试文件,因此在初始字段提取之后,它会将所有后续syslog消息放入syslog值的message字段中。
下面是我的终结器管道,它尝试用信号通知“原子消息”行为。我不确定为什么它不能工作。
terminator :: C.Conduit BS.ByteString (ResourceT IO) BS.ByteString
terminator = C.awaitForever yieldAndAddTerminator
where
yieldAndAddTerminator bs = do
C.yield bs
C.yield terminator
terminator = ""如何将UDP消息视为管道世界中的原子数据片段?
这个代码库的副本可以在这里找到:https://github.com/tureus/safe-forwarder。
发布于 2015-01-19 21:46:38
您可能希望将parseToLogData与一个防止它使用新行的函数融合在一起(ASCII码10)。使用管道组合符术语,类似于:
takeWhileCE (/= 10) =$= parseToLogData
dropWhileCE (/= 10) >> dropCE 1 -- flush the rest of it您可能还想研究一下line组合器函数。
https://stackoverflow.com/questions/28021416
复制相似问题