我正在尝试用Apache构建管道: spooldir -> kafka通道-> hdfs接收器
事件进入卡夫卡主题,没有问题,我可以看到他们与卡夫卡特的要求。但是kafka频道不能通过接收器将文件写入hdfs。错误是:
在等待Kafka的数据时超时
全日志:
2016-02-26 18:25:17,125 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在0ms之后获得了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:19,127 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在1ms后得到了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:21,129 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在0ms之后获得了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:21,775 (SinkRunner-PollingRunner-DefaultSinkProcessor) DEBUG - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:327)超时等待来自卡夫卡kafka.consumer.ConsumerTimeoutException的数据在kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69),kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33),kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate。org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: hasNext(IteratorTemplate.scala:58)68) org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)
我的水槽配置是:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c2
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181
# Bind the source and sink to the channel
a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2用记忆频道代替卡夫卡频道,一切都很好。
谢谢你事先提出的任何想法!
发布于 2016-02-27 11:09:18
ConsumerTimeoutException意味着很长一段时间内没有新的信息,但这并不意味着卡夫卡的连接暂停。
http://kafka.apache.org/documentation.html
如果在指定的间隔之后无法使用消息,则consumer.timeout.ms -1将向使用者抛出超时异常。
发布于 2016-03-22 20:29:03
Kafka的ConsumerConfig类具有"consumer.timeout.ms“配置属性,在默认情况下,该属性设置为-1。任何新的卡夫卡消费者都应该以适当的价值覆盖这一财产。
下面是来自Kafka文献的参考:
consumer.timeout.ms -1
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.当Flume创建一个Kafka通道时,它将timeout.ms值设置为100,就像在信息级别的Flume日志中看到的那样。这就解释了为什么我们看到一吨这样的ConsumerTimeoutExceptions。
level: INFO Post-validation flume configuration contains configuration for agents: [agent]
level: INFO Creating channels
level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
level: INFO Group ID was not specified. Using flume as the group id.
level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume,
zookeeper.connect=zookeeper:2181, **consumer.timeout.ms=100**, auto.commit.enable=false}
level: INFO Created channel c1按照卡夫卡频道设置上的Flume用户指南,我试图通过指定下面的内容来覆盖这个值,但这似乎行不通:
agent.channels.c1.kafka.consumer.timeout.ms=5000另外,我们不断地通过通道对数据进行负载测试,并且在测试过程中没有出现此异常。
发布于 2017-06-13 08:35:50
我阅读了flume的源代码,发现flume读取"consumer.timeout.ms“键”超时值“的值。
因此,您可以像下面这样配置"consumer.timeout.ms“的值:
agent1.channels.kafka_channel.timeout=-1
https://stackoverflow.com/questions/35655973
复制相似问题