首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Camel-Kafka ConsumerCount从定义的值降到1(默认值)。

Camel-Kafka ConsumerCount从定义的值降到1(默认值)。
EN

Stack Overflow用户
提问于 2019-09-24 09:57:56
回答 2查看 1.1K关注 0票数 1

我使用卡夫卡消费者在我们的应用程序与骆驼(骆驼-卡夫卡)。我们使用这些消息并将数据发送到服务器进行处理。有一个主题"XYZ“被定义为30个分区,我已经确定了15个作为每个消费者节点的使用者计数(总共2个实例)‘

CamelConsumer配置

代码语言:javascript
复制
kafka.consumersCount=15
kafka.consumerStreams=15

我从日志中看到,当使用者启动时,有15个使用者线程(假设在1个节点上),这在配置上是很好的。

代码语言:javascript
复制
INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

如果服务器由于网络问题或服务器不可用时的任何其他情况而停止响应。卡夫卡的所有消费者开始取消订阅,这也是一种预期的行为(到目前为止还不错)。

注意:我们还定义了Camel ThrottlingExceptionRoutePolicy,它在发送所消耗的消息之前在服务器上执行健康检查调用。

在服务器返回和可用之后,我看到不是所有15个使用者线程都是活动的,而只有一个。从下面的日志中,我观察到它一个接一个地被订阅和取消订阅,最后应用程序只运行一个使用者计数。

代码语言:javascript
复制
INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ListOfDefinedServers]
check.crcs = true
client.id = 
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = XYZ-GroupId-12345
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 5000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator servername (id: 2147482644 rack: null)
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously assigned partitions []
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with generation 1
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6]

任何知道为什么骆驼没有运行与定义的消费者计数,而是运行与一个单一的消费者。当负载更多时,这会影响使用者应用程序的处理速度。

有什么帮助或建议吗这里出了什么问题?

我期望15个消费者线程运行后,故障,这是定义的骆驼卡夫卡配置。

EN

回答 2

Stack Overflow用户

发布于 2020-08-11 16:37:27

我们遇到了同样的问题。

设置如下:

  • 弹簧-启动: 2.2.x
  • 骆驼版: 2.24.1
  • 主题有3个分区。
    • kafka.consumersCount=3 (每个分区1)
    • kafka.consumersStreams= 10默认值

  • ThrottlingExceptionRoutePolicy -如果出现异常,请打开电路。

在camel组件中,KafkaConsumer虽然实现了SuspendableService接口,但它并不实现可挂起的接口.因此,使用者需要关闭,并开始模拟挂起/恢复功能。

这就是当您的openCircuit()和closeCircuit()

  • KafkaConsumer (Singleton) bean使用AtomicBoolean字段--启动、启动、停止、停止、挂起、挂起、切换、关机以跟踪其当前状态。
  1. 一旦声明了应用程序的camel路由,状态就变成: KafkaConsumer {KafkaConsumer{启动=假,停止=假,停止=假,关闭=假}
  2. 当电路打开时,状态为: KafkaConsumer { started=false,starting= false,stopping=false,stopping=false关机= false }
  3. 当电路关闭启动时,KafkaConsumer状态仍然是:KafkaConsumer和启动卡夫卡消费者启动(恢复)进程。

KafkaConsumer:

代码语言:javascript
复制
  doStart(){
      // create executor with worker thread count = consumerStreams 
      for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
           KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
           // pre-initialize task during startup so if there is any error we have it thrown asap
           task.preInit();
           executor.submit(task);  
           tasks.add(task);
      }    
  }

KafkaFetchRecords:

代码语言:javascript
复制
doRun(){
   clientConsumer.subscribe(topic);
   while( isRunAllowed() && !reConnect && !isStoppingOrStopped() && !isSuspendingOrSuspended()){
      clientConsumer.poll();
   }
   clientConsumer.unsubscribe(topic) 
} 
  • 因为executor.submit()不延迟地触发doRun(),所以KafkaConsumer状态仍然是停止,while循环被跳过,kafka消费者客户端结束对主题的取消订阅。
  • 一旦doStart()的for循环完成,KafkaConsumer的状态被标记为 loop = false,started = true
  • 因此,对于最后一个(KafkaFetchRecord)任务的doRun()调用,输入while循环,并且使用者继续轮询消息。
  • 可能的解决办法是:
代码语言:javascript
复制
- replace the ExecutorService with ScheduledExecutor to schedule the task with delay
- implement the suspendable interface and support suspend/resume feature. underlying kafka client library already supports suspend/resume feature.
票数 1
EN

Stack Overflow用户

发布于 2020-08-13 04:38:10

这是用骆驼-12765在Camel 3.0.0版中解决的。

试着升级到camel 3.x

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58077699

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档