我们使用的是spring,kafka(spring kafka 2.4.5,spring cloud stream 3.0.1),openshift。我们有以下配置。多个代理/主题,每个主题有8个分区,复制因子为3,多个spring boot消费者。
作为弹性测试的一部分,当我们关闭其中一个代理时,我们会得到下面的异常,甚至在我们启动服务器之后,仍然会得到相同的错误。
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2020-05-19 18:39:57.598 ERROR [service,,,] 1 --- [ad | producer-5] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='{49, 50, 49, 50, 54, 53, 56}' and payload='{123, 34, 115, 111, 117, 114, 99, 101, 34, 58, 34, 72, 67, 80, 77, 34, 44, 34, 110, 97, 109, 101, 34...' to topic topicname
2020-05-19 18:39:57.598 WARN [service,,,] 1 --- [ad | producer-5] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-5] Received invalid metadata error in produce request on partition topicname-4 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now我检查了谷歌,大多数人说,将重试值更改为大于1会起作用,但由于即使在broker启动后也会出现错误,我不确定它是否起作用。
这是我在属性文件中的内容:
spring.cloud.stream.kafka.binder.brokers=${output.server}
spring.cloud.stream.kafka.binder.requiredAcks=1
spring.cloud.stream.bindings.outputChannel.destination=${output.topic}
spring.cloud.stream.bindings.outputChannel.content-type=application/json以及一行使用kafka streams API发送消息的代码。
`client.outputChannel().send(MessageBuilder.withPayload(message).setHeader(KafkaHeaders.MESSAGE_KEY, message.getId().getBytes()).build());`请帮帮我。
谢谢公羊
发布于 2020-05-20 16:18:51
代理停机
当关闭其中一个代理时,生产者的元数据更新可能还没有更新。因此,当它尝试为该代理中的分区发送数据时,它将失败,然后请求元数据更新,并再次尝试正确的代理,该代理现在是新的领导者。
代理重启
当代理再次添加回集群时,控制器将触发主题分区的重新平衡。因此,尚未使用此新元数据更新的生产者在尝试将数据发送到分区领导者已更改的代理时,也将显示失败的操作。在下一次重试时更新新元数据后,应该不会发生这种情况。
https://stackoverflow.com/questions/61902342
复制相似问题