我有一个基于Spring的小型原型,可以使用Protobuf将消息发布到Kafka集群。我正在使用汇合串行器:
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerio.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer我还将从Confluent (最新版本)运行schemas来处理Protobuf模式。一切都如期而至。
现在,我想介绍Cloudevents规范(https://github.com/cloudevents/spec),但是我很难理解它是如何与合流模式注册表一起工作的。
Cloudevents有一个sdk 模块来直接将消息序列化到Protobuf。消息的data部分是我的版本化的有效负载应该去的地方,但是没有办法只为消息的一部分定义模式。更清楚的是:
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
.withData(???) <-- HERE IS WHERE MY PAYLOAD SHOULD BE VERSIONED
.build();一种解决方案是复制Cloudevent 模式,并在每个protobuf模式文件中简单地定义消息规范。这有一个缺点,就是我必须复制/粘贴每个新消息的Cloudevents原型模式。这将允许我使用标准Protobuf Kafka serde而不使用任何Cloudevent库。有没有更好的解决办法?
发布于 2022-02-25 17:23:11
如果您使用的是卡夫卡,您应该看看CloudEvents卡夫卡协议规范,这将是有自己的Kafka序列化程序类。
如果您阅读了这些内容,它将引用二进制datacontenttype,以及像application/cloudevents+avro这样的标头,它们可以以+protobuf作为后缀。
如果我正确地阅读了规范,那么Kafka值本身“必须”是JSON格式,而实际有效负载事件的数据可以被二进制编码(我猜是base64字符串吗?)因为JSON没有二进制类型)
基本上,您需要使用前面提到的类手动序列化Protobuf事件,并与Schema通信。然后把它放在CloudEvent记录中,最后使用一些"CloudEventSerializer“并生产.
然后在另一边执行相反的操作;从值中提取data有效负载,并将其传递给KafkaProtobufDeserializer.deserialize方法。
https://stackoverflow.com/questions/71264424
复制相似问题