我正在尝试通过flume将数据从kafka放入hdfs中。kafka_producer每10秒发送一条消息。我想在hdfs上将所有消息收集到一个文件中。这是我使用的flume的配置,但它在hdfs上存储了许多文件(一个用于消息):
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = prova
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/input
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink附言:我是从file.csv开始的。kafka生产者获取文件并选择一些感兴趣的字段,然后每10秒发送一个条目。Flume将条目存储在hadoop hdfs上,但存储在许多文件中(1个entry=1文件)。我希望所有的条目都在一个唯一的文件中。如何改变水槽的配置?
发布于 2017-07-13 06:16:38
看来flume当前确实被设置为在HDFS上为每个输入文件创建一个文件。
按照here的建议,您可以通过编写一个周期性的pig (或mapreduce)作业来处理此问题,该作业获取所有输入文件并将它们组合在一起。
减少文件数量的另一个选项可能是降低入站文件的频率。
发布于 2017-07-13 17:32:46
将rollInterval设置为0,因为您不希望根据时间创建不同的文件。如果您希望基于数字输入或事件,请更改rollCount值。例如,如果要在单个文件中保存10个事件或条目:
agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 10https://stackoverflow.com/questions/44969285
复制相似问题