我有一个需要发送给Kafka的DTO类,但是这个DTO类是用JsonFilter注释的,因此它失败了。我已经尝试配置如下所示的ObjectMapper设置过滤器提供程序,但它也不起作用。
@Configuration
public class FilterConfiguration {
public FilterConfiguration (ObjectMapper objectMapper) {
SimpleFilterProvider simpleFilterProvider = new SimpleFilterProvider().setFailOnUnknownId(true);
simpleFilterProvider.addFilter("PaymentFilter", SimpleBeanPropertyFilter.serializeAll());
objectMapper.setFilterProvider(simpleFilterProvider);
}
}Spring boot版本: 2.5.4
Spring boot application.yml配置
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
acks: "all"
compression-type: "gzip"
properties:
retries: 10
"enable.idempotence": true
consumer:
auto-offset-reset: earliest
client-id: "machineName"
isolation-level: READ_COMMITTED
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: ${spring.application.name}
enable-auto-commit: false
properties:
"spring.json.trusted.packages": "com.lalashree.app.*"
"group.instance.id": "machineName"下面是完整的错误堆栈跟踪
org.apache.kafka.common.errors.SerializationException: Can't serialize data [PaymentInfoDTO(super=AbstractAuditDetailDTO(createdBy=null, creationTime=null, lastModifiedBy=null, lastModificationTime=null, deleted=null, deletedBy=null, deletionTime=null), id=ravi 01, customerId=null, orderId=null, txnAmount=null, status=null, txnToken=null, txnUrl=null)] for topic [payment_PaymentInfoDTO]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
reactor.core.publisher.Mono.flatMap(Mono.java:3047)
org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:137)
Error has been observed at the following site(s):
|_ Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:137)
|_ Mono.defer ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:199)
|_ Mono.then ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:199)
|_ Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:200)
|_ Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:201)
Stack trace:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot resolve PropertyFilter with id 'PaymentFilter'; no FilterProvider configured
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ser.std.StdSerializer.findPropertyFilter(StdSerializer.java:426) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFieldsFiltered(BeanSerializerBase.java:811) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:176) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1514) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1215) ~[jackson-databind-2.12.4.jar:2.12.4]
at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1108) ~[jackson-databind-2.12.4.jar:2.12.4]
at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:195) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:185) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[kafka-clients-2.7.1.jar:na]
at brave.kafka.clients.TracingProducer.send(TracingProducer.java:129) ~[brave-instrumentation-kafka-clients-5.13.2.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:864) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:580) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:380) ~[spring-kafka-2.7.6.jar:2.7.6]
at com.lalashree.app.payment.web.rest.PaymentController.callback(PaymentController.java:83) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.3.9.jar:5.3.9]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:398) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:468) ~[reactor-netty-core-1.0.10.jar:1.0.10]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) ~[reactor-netty-core-1.0.10.jar:1.0.10]
at reactor.netty.channel.FluxReceive.request(FluxReceive.java:129) ~[reactor-netty-core-1.0.10.jar:1.0.10]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:498) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onSubscribe(MonoCollect.java:103) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:482) ~[reactor-core-3.4.9.jar:3.4.9]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.3.jar:3.0.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167) ~[reactor-netty-core-1.0.10.jar:1.0.10]
at reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:146) ~[reactor-netty-core-1.0.10.jar:1.0.10]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.67.Final.jar:4.1.67.Final]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]发布于 2021-09-12 08:32:33
调试后发现,我正在配置的ObjectMapper实例与spring Kafka在序列化期间使用的实例不同。
找到了一个解决方案,基本上,我们需要定制序列化过程中使用的ObjectMapper。为此,我们需要覆盖Spring JsonSerializer,并提供具有我们的配置的ObjectMapper实例。我正在配置spring boot提供的ObjectMapper的默认实例。所以这就是解决方案(如果还有更好的建议,请提出来):
producer properties:
value-serializer: com.abcd.MyJsonSerializer
MyJsonSerializer<T> extends JsonSerializer<T> {
public MyJsonSerializer() {
super((JavaType) null, StaticBeanUtil.getInstance().getObjectMapper());
}
}
@Component
@RequiredArgsConstructor
@Data
public class StaticBeanUtil implements InitializingBean {
private static StaticBeanUtil INSTANCE;
private final ApplicationContext applicationContext;
private final ObjectMapper objectMapper;
@Override
public void afterPropertiesSet() throws Exception {
INSTANCE = this;
}
public static StaticBeanUtil getInstance() {
return INSTANCE;
}
public <T> T getBean(Class<T> beanClass) {
return applicationContext.getBean(beanClass);
}
}https://stackoverflow.com/questions/69149467
复制相似问题