尝试使用Kafka绑定从云流读取CloudEvents的批处理消息。如果我在自定义序列化程序/反序列化器中使用任何自定义类,那么它可以正常工作,但是对于cloudevents,消息不会出现。
spring
cloud:
function.definition: consumer
stream:
bindings:
producer-out-0:
destination: audit
group: audit-producer
producer:
useNativeEncoding: true
consumer-in-0:
destination: audit
group: audit-consumer
consumer:
batch-mode: true
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
consumer-properties:
max.poll.records: 5
fetch.min.bytes: 10000
fetch.max.wait.ms: 10000
bindings:
producer-out-0:
producer:
configuration:
cloudevents:
serializer:
encoding: STRUCTURED
event_format: application/cloudevents+json
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
value.serializer: io.cloudevents.kafka.CloudEventSerializer
consumer-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
value.deserializer: io.cloudevents.kafka.CloudEventDeserializer消费者我试着用List和单机
@Bean
public Consumer<List<CloudEvent>> consumer() {
System.out.println("inside consumer");
//return auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage);
return s -> s.forEach(auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage));
}如果我只使用使用者,我得到以下错误,这意味着反序列化正在发生,但不知何故消息并没有传递给使用者。
2022-10-26 20:31:24.070 WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]发布于 2022-10-26 15:44:43
看起来您正在使用Cloud,不确定您从中得到了什么价值,因为没有它您可以完成所有事情(甚至更多)。下面是我讨论的两篇文章:
https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1 https://spring.io/blog/2020/12/23/cloud-events-and-spring-part-2
无论如何,如果您仍然希望依赖像CloudEvent这样的SDK类型,则可能缺少依赖项:
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>2.3.0</version>
</dependency>此外,我们有基于SDK的示例,因此这可能会有所帮助。https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml
否则,你能做的最好的事情是创建一个小应用程序来复制这个问题,把它推到github,并发送一个链接,这样我们就可以看一看。
https://stackoverflow.com/questions/74210302
复制相似问题