我想用case class TimedGenericRecord(record: GenericRecord, timestamp: Long)创建一个StoreBuilder[KeyValueStore[String, TimedGenericRecord]]存储。
因此我需要创建一个Serde[TimedGenericRecord]。
对于GenericRecord,Kafka已经提供了Serde,Long也是如此。有没有办法为case类创建一个Serde并使用那些提供的Serde?因为在当前的设置中,您似乎只能反序列化一个完整的byte[],这不允许您重用所提供的Serde。
发布于 2019-02-14 12:09:28
取自Confluent Slack频道的回答:
在您的示例中,您知道
long被序列化为8个字节--因此,您将inputBytes[0]到inputBytes[inputBytes.lenght - 8]复制到一个新的字节数组中,该数组将提供给AvroDeserializer,副本的最后8个字节将提供给LongDeserializer。在序列化路径上,将avro和long序列化,然后将两者连接到一个返回的字节数组中。
https://stackoverflow.com/questions/54673212
复制相似问题