首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >创建消费者绑定失败,30秒后重试

创建消费者绑定失败,30秒后重试
EN

Stack Overflow用户
提问于 2019-04-17 20:04:16
回答 2查看 7.4K关注 0票数 1

我使用的是spring cloud stream。我有两个频道。一种是使用kafka cluster1,一种是使用cluster2。config就像。

代码语言:javascript
复制
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.binders.kafka.type=kafka
spring.cloud.stream.binders.kafka.environment.spring.cloud.stream.kafka.binder.brokers=xxxx

spring.cloud.stream.kafka.binder.auto-add-partitions=false
spring.cloud.stream.kafka.binder.auto-create-topics=false

spring.cloud.stream.bindings.channel1-input.destination=top1
spring.cloud.stream.bindings.channel1-input.group=mygroup
spring.cloud.stream.bindings.channel1-input.consumer.concurrency=3
spring.cloud.stream.bindings.channel1-input.consumer.partitioned=true
spring.cloud.stream.bindings.channel1-input.content-type=application/json


spring.cloud.stream.binders.kafka2.type=kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=xxx

spring.cloud.stream.bindings.channel-input2.content-type=application/protocol-buffer
spring.cloud.stream.bindings.channel-input2.destination=topipc2
spring.cloud.stream.bindings.channel-input2.group=mygroup
spring.cloud.stream.bindings.channel-input2.producer.headerMode=raw
spring.cloud.stream.bindings.channel-input2.binder=kafka2

当我运行应用程序时,我可以消费两个主题的消息。但是我得到了下面的错误信息。这让我很困惑。你有什么想法吗?

代码语言:javascript
复制
2019-04-17 19:51:54.502 [task-scheduler-4] ERROR o.s.cloud.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getPartitionInfo(KafkaMessageChannelBinder.java:573)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:331)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:126)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:279)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:77)
        at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:129)
        at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:154)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$2(KafkaTopicProvisioner.java:373)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:370)
        ... 15 common frames omitted
EN

回答 2

Stack Overflow用户

发布于 2019-04-17 20:40:06

NPE是我们这边的bug的标志;我们无论如何都要修复它,所以请提出问题here

也就是说,我想知道您是否有一些Kafka配置/安全问题,这些问题会阻止您创建分区,因为下面的代码似乎返回null

代码语言:javascript
复制
 CreatePartitionsResult partitions = adminClient
                        .createPartitions(Collections.singletonMap(topicName,
                                NewPartitions.increaseTo(effectivePartitionCount)));
票数 1
EN

Stack Overflow用户

发布于 2019-04-18 10:58:26

最后,我找到了根本原因,因为我启用了@EnableBinding(Sink.class),但我的kafka服务器没有自动创建主题。在我删除@EnableBinding(Sink.class)之后。问题已解决。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55727177

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档