首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache : kafka.consumer.ConsumerTimeoutException

Apache : kafka.consumer.ConsumerTimeoutException
EN

Stack Overflow用户
提问于 2016-02-26 15:44:18
回答 3查看 924关注 0票数 0

我正在尝试用Apache构建管道: spooldir -> kafka通道-> hdfs接收器

事件进入卡夫卡主题,没有问题,我可以看到他们与卡夫卡特的要求。但是kafka频道不能通过接收器将文件写入hdfs。错误是:

在等待Kafka的数据时超时

全日志:

2016-02-26 18:25:17,125 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在0ms之后获得了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:19,127 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在1ms后得到了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:21,129 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181))调试- org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)在0ms之后获得了sessionid: 0x2524a81676d02aa的ping响应。 2016-02-26 18:25:21,775 (SinkRunner-PollingRunner-DefaultSinkProcessor) DEBUG - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:327)超时等待来自卡夫卡kafka.consumer.ConsumerTimeoutException的数据在kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69),kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33),kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate。org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: hasNext(IteratorTemplate.scala:58)68) org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)

我的水槽配置是:

代码语言:javascript
复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c2

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =  hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.channels.c2.type   = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181

# Bind the source and sink to the channel
a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2

用记忆频道代替卡夫卡频道,一切都很好。

谢谢你事先提出的任何想法!

EN

回答 3

Stack Overflow用户

发布于 2016-02-27 11:09:18

ConsumerTimeoutException意味着很长一段时间内没有新的信息,但这并不意味着卡夫卡的连接暂停。

http://kafka.apache.org/documentation.html

如果在指定的间隔之后无法使用消息,则consumer.timeout.ms -1将向使用者抛出超时异常。

票数 0
EN

Stack Overflow用户

发布于 2016-03-22 20:29:03

Kafka的ConsumerConfig类具有"consumer.timeout.ms“配置属性,在默认情况下,该属性设置为-1。任何新的卡夫卡消费者都应该以适当的价值覆盖这一财产。

下面是来自Kafka文献的参考:

代码语言:javascript
复制
consumer.timeout.ms     -1  
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

当Flume创建一个Kafka通道时,它将timeout.ms值设置为100,就像在信息级别的Flume日志中看到的那样。这就解释了为什么我们看到一吨这样的ConsumerTimeoutExceptions。

代码语言:javascript
复制
 level: INFO Post-validation flume configuration contains configuration for agents: [agent]
 level: INFO Creating channels
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Group ID was not specified. Using flume as the group id.
 level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume, 
              zookeeper.connect=zookeeper:2181, **consumer.timeout.ms=100**, auto.commit.enable=false}
 level: INFO Created channel c1

按照卡夫卡频道设置上的Flume用户指南,我试图通过指定下面的内容来覆盖这个值,但这似乎行不通:

代码语言:javascript
复制
agent.channels.c1.kafka.consumer.timeout.ms=5000

另外,我们不断地通过通道对数据进行负载测试,并且在测试过程中没有出现此异常。

票数 0
EN

Stack Overflow用户

发布于 2017-06-13 08:35:50

我阅读了flume的源代码,发现flume读取"consumer.timeout.ms“键”超时值“的值。

因此,您可以像下面这样配置"consumer.timeout.ms“的值:

agent1.channels.kafka_channel.timeout=-1

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35655973

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档