首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring boot Kafka SerializationException原因:无法解析id未配置FilterProvider的PropertyFilter

Spring boot Kafka SerializationException原因:无法解析id未配置FilterProvider的PropertyFilter
EN

Stack Overflow用户
提问于 2021-09-12 07:47:52
回答 1查看 199关注 0票数 0

我有一个需要发送给Kafka的DTO类,但是这个DTO类是用JsonFilter注释的,因此它失败了。我已经尝试配置如下所示的ObjectMapper设置过滤器提供程序,但它也不起作用。

代码语言:javascript
复制
@Configuration
public class FilterConfiguration {
    public FilterConfiguration (ObjectMapper objectMapper) {
        SimpleFilterProvider simpleFilterProvider = new SimpleFilterProvider().setFailOnUnknownId(true);
        simpleFilterProvider.addFilter("PaymentFilter", SimpleBeanPropertyFilter.serializeAll());

        objectMapper.setFilterProvider(simpleFilterProvider);
    }
}

Spring boot版本: 2.5.4

Spring boot application.yml配置

代码语言:javascript
复制
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      acks: "all"
      compression-type: "gzip"
      properties:
        retries: 10
        "enable.idempotence": true
    consumer:
      auto-offset-reset: earliest
      client-id: "machineName"
      isolation-level: READ_COMMITTED
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: ${spring.application.name}
      enable-auto-commit: false
      properties:
        "spring.json.trusted.packages": "com.lalashree.app.*"
        "group.instance.id": "machineName"

下面是完整的错误堆栈跟踪

代码语言:javascript
复制
org.apache.kafka.common.errors.SerializationException: Can't serialize data [PaymentInfoDTO(super=AbstractAuditDetailDTO(createdBy=null, creationTime=null, lastModifiedBy=null, lastModificationTime=null, deleted=null, deletedBy=null, deletionTime=null), id=ravi 01, customerId=null, orderId=null, txnAmount=null, status=null, txnToken=null, txnUrl=null)] for topic [payment_PaymentInfoDTO]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
    reactor.core.publisher.Mono.flatMap(Mono.java:3047)
    org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:137)
Error has been observed at the following site(s):
    |_  Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:137)
    |_    Mono.defer ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:199)
    |_     Mono.then ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:199)
    |_ Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:200)
    |_ Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:201)
Stack trace:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot resolve PropertyFilter with id 'PaymentFilter'; no FilterProvider configured
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ser.std.StdSerializer.findPropertyFilter(StdSerializer.java:426) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFieldsFiltered(BeanSerializerBase.java:811) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:176) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1514) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1215) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1108) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:195) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:185) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[kafka-clients-2.7.1.jar:na]
    at brave.kafka.clients.TracingProducer.send(TracingProducer.java:129) ~[brave-instrumentation-kafka-clients-5.13.2.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:864) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:580) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:380) ~[spring-kafka-2.7.6.jar:2.7.6]
    at com.lalashree.app.payment.web.rest.PaymentController.callback(PaymentController.java:83) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.3.9.jar:5.3.9]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:468) ~[reactor-netty-core-1.0.10.jar:1.0.10]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) ~[reactor-netty-core-1.0.10.jar:1.0.10]
    at reactor.netty.channel.FluxReceive.request(FluxReceive.java:129) ~[reactor-netty-core-1.0.10.jar:1.0.10]
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onSubscribe(MonoCollect.java:103) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.9.jar:3.4.9]
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167) ~[reactor-netty-core-1.0.10.jar:1.0.10]
    at reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:146) ~[reactor-netty-core-1.0.10.jar:1.0.10]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
    at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
EN

回答 1

Stack Overflow用户

发布于 2021-09-12 08:32:33

调试后发现,我正在配置的ObjectMapper实例与spring Kafka在序列化期间使用的实例不同。

找到了一个解决方案,基本上,我们需要定制序列化过程中使用的ObjectMapper。为此,我们需要覆盖Spring JsonSerializer,并提供具有我们的配置的ObjectMapper实例。我正在配置spring boot提供的ObjectMapper的默认实例。所以这就是解决方案(如果还有更好的建议,请提出来):

代码语言:javascript
复制
producer properties:
value-serializer: com.abcd.MyJsonSerializer

MyJsonSerializer<T> extends JsonSerializer<T> {

    public MyJsonSerializer() {
        super((JavaType) null, StaticBeanUtil.getInstance().getObjectMapper());
    }

}

@Component
@RequiredArgsConstructor
@Data
public class StaticBeanUtil implements InitializingBean {
    private static StaticBeanUtil INSTANCE;

    private final ApplicationContext applicationContext;
    private final ObjectMapper objectMapper;

    @Override
    public void afterPropertiesSet() throws Exception {
        INSTANCE = this;
    }

    public static StaticBeanUtil getInstance() {
        return INSTANCE;
    }

    public <T> T getBean(Class<T> beanClass) {
        return applicationContext.getBean(beanClass);
    }

}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69149467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档