首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么一个可选的水槽通道会导致一个非可选的水槽通道有问题?

为什么一个可选的水槽通道会导致一个非可选的水槽通道有问题?
EN

Stack Overflow用户
提问于 2015-06-25 17:03:29
回答 1查看 462关注 0票数 1

我有一个看似简单的水槽配置,这给我带来了很多问题。让我先描述一下问题,然后列出配置文件。

我有3台服务器: Server1、Server2、Server3。

Server1: Netcat源/SyAdd.1-tcp源(我在两个netcat上都测试了这一点,没有acks和syslogtcp),2个内存通道,2个Avro接收器(每个通道一个),复制选择器,第二个内存通道(可选)

Server2,3 2,3: Avro源内存通道Kafka接收器

在我的模拟中,Server2模拟的是“生产”,因此不会经历任何数据损失,而Server3则模拟“开发”,而数据丢失则很好。我的假设是,使用两个通道和两个源将使两个服务器相互解耦,如果Server3下降,它不会影响Sever2 (特别是使用可选配置选项!)。然而,情况并非如此。当我运行我的模拟并使用CTRL终止Server3时,我在Server2上经历了减速,从Server2到Kafka接收器的输出变成了爬行。当我在Server3上继续做水槽代理时,一切都恢复正常了。

我没想到会有这种行为。我所期望的是,因为我有两个通道和两个水槽,如果一个通道和/或水槽下降,另一个通道和/或水槽就不会有问题。这是水槽的限制吗?这是对我的源、汇或频道的限制吗?是否有一种方法可以让Flume在我使用一个具有多个通道和接收器的代理时进行操作,这些通道和接收器是相互解耦的?我真的不想在每台“环境”(生产和开发)的每台机器上有多个水槽代理。附加的是我的配置文件,这样您就可以以更技术性的方式看到我所做的事情:

SERVER1 (第一层代理)

代码语言:javascript
复制
#Describe the top level configuration    
agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2

#Describe/configure the source
agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false
#agent.sources.mySource.type = syslogtcp
#agent.sources.mySource.host = 0.0.0.0
#agent.sources.mySource.port = 7103
#agent.sources.mySource.eventSize = 150000
agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2

#Describe/configure the channel
agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200

agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200

#Avro Sink
agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666

agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666

SERVER2 "PROD“水槽剂

代码语言:javascript
复制
#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel

SERVER3 "DEV“型水槽剂

代码语言:javascript
复制
#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel 

谢谢你的帮忙!

EN

回答 1

Stack Overflow用户

发布于 2015-07-14 20:14:02

我将考虑调整这个配置参数,因为它与内存通道有关:

agent.channels.defaultChannel.transactionCapacity = 5000 agent.channels.defaultChannel.capacity = 200

可能先尝试加倍,然后再进行一次测试,您应该看到改进:

agent.channels.defaultChannel.transactionCapacity = 10000 agent.channels.defaultChannel.capacity = 400

在测试期间观察Apache Flume实例的JVM也是很好的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31056486

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档