我实现了一个自定义侦听器,即不使用@KafkaListener注释,因为我的应用程序需要动态链接到主题。我已经看到了迁移到Spring kafka 2.6.x的建议,但我不能升级,因为我被Spring 5.1.X.RELEASE卡住了(至少现在是这样),这意味着我只能使用Spring-kafka 2.2.x。
我的问题是,如何使用Spring-kafka 2.2.x实现重试、恢复和错误处理?
ConcurrentKafkaListenerContainerFactory listenerFactory = new ConcurrentKafkaListenerContainerFactory();
listenerFactory.setConsumerFactory(consumerFactory);
configurer.configure(listenerFactory, consumerFactory);
listenerFactory.setConcurrency(listenerConcurrency);
listenerFactory.setStatefulRetry(Boolean.TRUE);
listenerFactory.setBatchListener(isBatchListener);
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
listenerFactory.getContainerProperties().setAckOnError(false);
listenerFactory.setRetryTemplate(retryTemplate(kafkaEhCacheRetryManager));
listenerFactory.setRecoveryCallback(kafkaRecoverer);我的重试模板如下所示:
RetryTemplate retryTemplate(EhCacheCacheManager kafkaEhCacheRetryManager) {
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(initialIntervalForRetries);
exponentialBackOffPolicy.setSleeper(new ThreadWaitSleeper());
exponentialBackOffPolicy.setMultiplier(2.0);
exponentialBackOffPolicy.setMaxInterval(maxIntervalForRetries);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new KafkaEhRetryContextCache(kafkaEhCacheRetryManager));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
// KafkaTransactionalRetryPolicy extends SimpleRetryPolicy
KafkaTransactionalRetryPolicy retryPolicy = new KafkaTransactionalRetryPolicy(kafkaTemplate);
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(kafkaTransactionalRetryPolicy);
return retryTemplate;
}我的监听器看起来像这样:
public class MyKafkaListener implements MessageListener<String, String> {
@Override
@Transactional(value = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord){
throw new RuntimeException("thrown out of out anger");
}
}使用此配置:
spring.kafka:
bootstrap-servers: ${service.kakfa.host}
admin:
client-id: test-consumers
bootstrap-servers: ${service.kakfa.host}
consumer:
bootstrap-servers: ${service.kakfa.host}
group-id: local-consumers
client-id: local-consumers
auto-offset-reset: earliest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
isolation-level: read_committed
producer:
bootstrap-servers: ${service.kakfa.host}
client-id: local-producer
acks: all
retries: 3
transaction-id-prefix: local-producer-tx-
properties:
enable.idempotence: true
transactional.id: tran-id-1-
max.in.flight.requests.per.connection: 5
listener.concurrency: 1我在StackOverflow上看到了几个关于如何做到这一点的例子,但到目前为止还没有一个有效。
发布于 2021-02-16 23:11:48
您尝试使用的重试机制仅适用于@KafakListener%s。它内置于用于调用监听程序POJO的监听程序适配器中。
在较新的版本中,SeekToCurrentErrorHandler和DefaultAfterRollbackProcessor有一个后退(从2.3开始),不再需要侦听器级别的重试模板,而是支持容器级别的重试。
对于您自己的侦听器,您必须在侦听器代码本身中使用RetryTemplate。
顺便说一句,https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions#supported-versions不再支持Spring5.1.x
https://stackoverflow.com/questions/66183607
复制相似问题