我有一个看似简单的水槽配置,这给我带来了很多问题。让我先描述一下问题,然后列出配置文件。
我有3台服务器: Server1、Server2、Server3。
Server1: Netcat源/SyAdd.1-tcp源(我在两个netcat上都测试了这一点,没有acks和syslogtcp),2个内存通道,2个Avro接收器(每个通道一个),复制选择器,第二个内存通道(可选)
Server2,3 2,3: Avro源内存通道Kafka接收器
在我的模拟中,Server2模拟的是“生产”,因此不会经历任何数据损失,而Server3则模拟“开发”,而数据丢失则很好。我的假设是,使用两个通道和两个源将使两个服务器相互解耦,如果Server3下降,它不会影响Sever2 (特别是使用可选配置选项!)。然而,情况并非如此。当我运行我的模拟并使用CTRL终止Server3时,我在Server2上经历了减速,从Server2到Kafka接收器的输出变成了爬行。当我在Server3上继续做水槽代理时,一切都恢复正常了。
我没想到会有这种行为。我所期望的是,因为我有两个通道和两个水槽,如果一个通道和/或水槽下降,另一个通道和/或水槽就不会有问题。这是水槽的限制吗?这是对我的源、汇或频道的限制吗?是否有一种方法可以让Flume在我使用一个具有多个通道和接收器的代理时进行操作,这些通道和接收器是相互解耦的?我真的不想在每台“环境”(生产和开发)的每台机器上有多个水槽代理。附加的是我的配置文件,这样您就可以以更技术性的方式看到我所做的事情:
SERVER1 (第一层代理)
#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2
#Describe/configure the source
agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false
#agent.sources.mySource.type = syslogtcp
#agent.sources.mySource.host = 0.0.0.0
#agent.sources.mySource.port = 7103
#agent.sources.mySource.eventSize = 150000
agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2
#Describe/configure the channel
agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200
agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200
#Avro Sink
agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666
agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666SERVER2 "PROD“水槽剂
#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink
#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel
#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder
#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200
#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannelSERVER3 "DEV“型水槽剂
#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink
#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel
#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder
#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200
#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel 谢谢你的帮忙!
发布于 2015-07-14 20:14:02
我将考虑调整这个配置参数,因为它与内存通道有关:
agent.channels.defaultChannel.transactionCapacity = 5000 agent.channels.defaultChannel.capacity = 200
可能先尝试加倍,然后再进行一次测试,您应该看到改进:
agent.channels.defaultChannel.transactionCapacity = 10000 agent.channels.defaultChannel.capacity = 400
在测试期间观察Apache Flume实例的JVM也是很好的。
https://stackoverflow.com/questions/31056486
复制相似问题