我有一个流,在一个目录中观察多个文件的输出,处理数据并将其放到HDFS中。下面是我的stream creat命令:
stream create --name fileHdfs --definition "file --dir=/var/log/supervisor/ --pattern=tracker.out-*.log --outputType=text/plain | logHdfsTransformer | hdfs --fsUri=hdfs://192.168.1.115:8020 --directory=/data/log/appsync --fileName=log --partitionPath=path(dateFormat('yyyy/MM/dd'))" --deploy问题的根源是:文件模块将所有从文件读取的数据发送到日志处理模块,而不是每轮一行,因此,有效负载字符串有数百万个字符,我无法处理它。例如:
--- PAYLOAD LENGTH---- 9511284请告诉我在使用source:file模块时如何逐行阅读,谢谢!
发布于 2014-08-01 00:02:36
它目前还不受支持,但是使用Spring Integration inbound-channel-adapter来调用一次读取一行的POJO来编写自定义source会很容易。
您也可以在XD中使用job而不是stream来实现。
发布于 2015-03-04 03:33:38
我知道这可能为时已晚,但对于任何正在寻找解决方案的谷歌人来说:
尽管没有自动执行此操作的模块或选项,但只需添加一个拆分器即可将传入消息分离为多个传出消息。
请注意,您必须在使用\n和\r\n之间做出选择。请检查您的文件以了解它们使用的是什么。
示例:
stream create --name filetest --definition "file --outputType=text/plain --dir=/tmp/examplefiles/| splitter --expression=payload.split('\\n') | log" --deploy干杯!
发布于 2015-05-20 08:25:12
Spring Integration有一个将文本文件拆分成行的FileSplitter。您可以使用它来创建自定义处理器模块,让我们称之为文件拆分:
.xml文件:
数据流创建--名称日志--定义“文件--目录=/ splitFile --ref=true |文件分割|日志”--部署
如果需要,您可以进一步自定义模块以获取更多选项。
https://stackoverflow.com/questions/25062844
复制相似问题