我有一个配置服务器和几个与Kafka一起工作的客户端。当我更新配置服务器上的配置属性时,我希望我的客户端不间断地工作。
下面是我的配置设置:
@Component
@RefreshScope
@ConfigurationProperties(prefix = "params")
public class ConfigParams {
private KafkaParams kafkaParams = new KafkaParams(); // some custom params (doesn't matter)
// ...
}@Configuration
public class KafkaConfig {
@Autowired
private ConfigParams configParams;
// ...
@Bean
@RefreshScope
MyKafkaBean myKafkaBean() {
return new MyKafkaBean(configParams.getKafkaParams()); // here I create my own kafka bean with topics, producers and consumers
}
// ...
}问题描述:
如果我将bootstrap.servers属性更改为不正确的,并且调用了卡夫卡生产者,就会有一个警告,其中包含一个消息,即生产者不能连接到kafka (这是正确的)。当我修复这个属性时,连接又回来了,一切正常,但我仍然警告您不要连接到代理的"producer-1“("producer-2”是为新连接创建的)。
P.S.的消费者工作正常,没有任何问题。
P.S.P.S.S.S.我已经尝试过用DefaultKafkaConsumerFactory#reset或DefaultKafkaConsumerFactory#destroy手动删除生产者,但是没有帮助。
P.S.我认为问题在于卡夫卡节省了生产者的缓存。但我不知道怎么解决。
我会感谢你的帮助。
发布于 2022-08-12 13:37:01
我也有过类似的问题。在处理刷新机制之前,我使用RefreshRemoteApplicationEvent侦听器手动关闭了生产者。例如:
@Slf4j
public class MyRefreshEventListener implements ApplicationListener<RefreshRemoteApplicationEvent> {
private final MyKafkaBean myKafkaBean;
public MyRefreshEventListener(@Autowired MyKafkaBean myKafkaBean) {
this.myKafkaBean = myKafkaBean;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
log.info("Refresh myKafkaBean:");
if (myKafkaBean != null) {
closeProducer(myKafkaBean);
}
}
private void closeProducer(Producer<String, String> producer) {
if (producer != null) {
producer.flush();
producer.close();
}
}
}https://stackoverflow.com/questions/69830098
复制相似问题