首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用的CloudEvents不使用Kafka的批处理用户

使用的CloudEvents不使用Kafka的批处理用户
EN

Stack Overflow用户
提问于 2022-10-26 15:24:42
回答 1查看 93关注 0票数 0

尝试使用Kafka绑定从云流读取CloudEvents的批处理消息。如果我在自定义序列化程序/反序列化器中使用任何自定义类,那么它可以正常工作,但是对于cloudevents,消息不会出现。

代码语言:javascript
复制
spring
  cloud:
    function.definition: consumer
    stream:
      bindings:
        producer-out-0:
          destination: audit
          group: audit-producer
          producer:
            useNativeEncoding: true
        consumer-in-0:
          destination: audit
          group: audit-consumer
          consumer:
            batch-mode: true
            useNativeDecoding: true
      kafka:
        binder:
          brokers: localhost:9092
          consumer-properties:
            max.poll.records: 5
            fetch.min.bytes: 10000
            fetch.max.wait.ms: 10000
        bindings:
          producer-out-0:
            producer:
              configuration:
                cloudevents:
                  serializer:
                    encoding: STRUCTURED
                    event_format: application/cloudevents+json
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
#                value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
                value.serializer: io.cloudevents.kafka.CloudEventSerializer
          consumer-in-0:
            consumer:
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
#                value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
                value.deserializer: io.cloudevents.kafka.CloudEventDeserializer

消费者我试着用List和单机

代码语言:javascript
复制
@Bean
public Consumer<List<CloudEvent>> consumer() {
  System.out.println("inside consumer");
  //return auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage);
  return s -> s.forEach(auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage));
}

如果我只使用使用者,我得到以下错误,这意味着反序列化正在发生,但不知何故消息并没有传递给使用者。

代码语言:javascript
复制
2022-10-26 20:31:24.070  WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]
代码语言:javascript
复制
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]
代码语言:javascript
复制
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-26 15:44:43

看起来您正在使用Cloud,不确定您从中得到了什么价值,因为没有它您可以完成所有事情(甚至更多)。下面是我讨论的两篇文章:

https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1 https://spring.io/blog/2020/12/23/cloud-events-and-spring-part-2

无论如何,如果您仍然希望依赖像CloudEvent这样的SDK类型,则可能缺少依赖项:

代码语言:javascript
复制
<dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-spring</artifactId>
   <version>2.3.0</version>
</dependency>

此外,我们有基于SDK的示例,因此这可能会有所帮助。https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml

否则,你能做的最好的事情是创建一个小应用程序来复制这个问题,把它推到github,并发送一个链接,这样我们就可以看一看。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74210302

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档