首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >必须指定flume-ng throws Kafka主题

必须指定flume-ng throws Kafka主题
EN

Stack Overflow用户
提问于 2018-07-10 06:35:29
回答 1查看 387关注 0票数 1

我正在尝试从我的kafka主题中提取数据并将其写入HDFS,并且我的flume conf似乎与我在几个示例中看到的相同,但我似乎无法绕过下面的错误。我可以通过python从主题中进行消费,所以我知道我可以做到这一点。我使用的是flume版本1.6.0和java 9.0.1。我做错了什么,让它不接受卡夫卡的话题?

代码语言:javascript
复制
09 Jul 2018 17:17:26,973 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:145) -Creating channels
09 Jul 2018 17:17:26,984 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel kafka_hdfs_channel type memory
09 Jul 2018 17:17:26,989 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:200)  - Created channel kafka_hdfs_channel

09 Jul 2018 17:17:26,989 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source kafka_source, type org.apache.flume.source.kafka.KafkaSource
09 Jul 2018 17:17:26,993 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361)  - Source kafka_source has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: Kafka topic must be specified.
    at org.apache.flume.source.kafka.KafkaSource.configure(KafkaSource.java:180)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.base/java.lang.Thread.run(Thread.java:844)}

这是我的flume配置:

代码语言:javascript
复制
agentCDIS.sources = kafka_source
agentCDIS.channels = kafka_hdfs_channel
agentCDIS.sinks = hdfs_sink

agentCDIS.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agentCDIS.sources.kafka_source.kafka.bootstrap.servers = 10.4.3.61:9092, 10.4.3.62:9092, 10.4.3.63:9092
agentCDIS.sources.kafka_source.kafka.topic = test
agentCDIS.sources.kafka_source.kafka.consumer.group.id = cn_flume_group
agentCDIS.sources.kafka_source.channels = kafka_hdfs_channel
agentCDIS.sources.kafka_source.interceptors = i1
agentCDIS.sources.kafka_source.interceptors.i1.type = timestamp
agentCDIS.sources.kafka_source.kafka.consumer.timeout.ms = 1000

agentCDIS.channels.kafka_hdfs_channel.type = memory
agentCDIS.channels.kafka_hdfs_channel.capacity = 10000
agentCDIS.channels.kafka_hdfs_channel.transactionCapacity = 1000

agentCDIS.sinks.hdfs_sink.type = hdfs
agentCDIS.sinks.hdfs_sink.hdfs.path = hdfs://10.4.16.16:8020/user/cnelson/kafka/%{topic}/%y-%m-%d
agentCDIS.sinks.hdfs_sink.hdfs.rollInterval = 5
agentCDIS.sinks.hdfs_sink.hdfs.rollSize = 0
agentCDIS.sinks.hdfs_sink.fileType = DataStream
agentCDIS.sinks.hdfs_sink.channel = kafka_hdfs_channel

agentCDIS.sinks.loggerSink.type = logger
agentCDIS.sinks.loggerSink.kafka_hdfs_channel = memoryChannel

agentCDIS.channels.memoryChannel.type = memory
agentCDIS.channels.memoryChannel.capacity = 100
EN

回答 1

Stack Overflow用户

发布于 2018-07-23 05:28:18

我看了几遍帖子和配置,注意到-您提到您使用的是Flume的1.6版本,根据documentation,属性略有不同。您可以尝试以下操作吗:

  1. 而不是agentCDIS.sources.kafka_source.kafka.bootstrap.servers => try agentCDIS.sources.kafka_source.topic = test
  2. Instead -此属性的值为您的Kafka cluster.
  3. Instead of agentCDIS.sources.kafka_source.kafka.topic = test => try cluster.
  4. Instead of agentCDIS.sources.kafka_source.kafka.consumer.group.id = cn_flume_group => try agentCDIS.sources.kafka_source.topic = test
  5. Instead使用的zookeeper URI

您在配置文件中使用的上述3个属性是从version 1.7引入的。

我希望这能帮到你!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51254918

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档