首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在使用spring部署批处理消费者时获取java.lang.ClassCastException

在使用spring部署批处理消费者时获取java.lang.ClassCastException
EN

Stack Overflow用户
提问于 2020-11-11 19:12:25
回答 1查看 238关注 0票数 0

我使用Spring-Kafka2.2.8创建一个批处理消费者,并在部署我的使用者时得到一个异常。

代码语言:javascript
复制
java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:455) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:433) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:310) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:62) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:200) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:172) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:146) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:164) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:158) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:263) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862) ~[spring-beans-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:743) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:390) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at com.abc.xyz.yyy.kafka.integrationtestapi.Application.main(Application.java:10) ~[classes/:?]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_202]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_202]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_202]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_202]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) ~[app/:?]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) ~[app/:?]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.loader.Launcher.launch(Launcher.java:51) ~[app/:?]
   2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT     at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:52) ~[app/:?]

这是我的使用者配置

代码语言:javascript
复制
@Bean
public ConsumerFactory consumerFactory(){
    return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(), avroValueDeserializer());
}
@Bean
public RetryPolicy getRetryPolicy(){
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    simpleRetryPolicy.setMaxAttempts(3);
    return simpleRetryPolicy;
}

@Bean
public FixedBackOffPolicy getBackOffPolicy() {
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(100);
    return backOffPolicy;
}

@Bean
public RetryTemplate getRetryTemplate(){
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(getRetryPolicy());
    retryTemplate.setBackOffPolicy(getBackOffPolicy());
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setStatefulRetry(true);
    factory.setRetryTemplate(getRetryTemplate());
    return factory;
}

现在我的问题是,

  1. 在设置retryTemplate时是否强制设置ErrorHandler?还是我在这里遗漏了导致这个异常的其他东西?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-11-11 19:50:38

批处理侦听器不支持RetryTemplate (我们不知道批处理中哪些记录失败)。

在这种情况下,最好在侦听器中直接使用RetryTemplate

像这样..。

代码语言:javascript
复制
@KafkaListener(...)
void listen(List<Foo> in) {
    in.forEach(foo -> {
        this.retryTemplate.execute(context -> {
            // process the foo
        }, context -> {
            // retries exhausted - save somewhere and move on to the next
        });
    });
}

从2.3.7版本开始,您可以配置一个RetryingBatchErrorHandler,然后重新尝试整个批处理。

从版本2.5开始,您可以配置一个RecoveringBatchErrorHandler,在这里您可以向处理程序抛出一个特殊的异常,该处理程序记录在批处理中失败,因此只有未处理的记录才会被重新传递。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64792728

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档