有什么简单的方法吗?如何将星星之火结构化的数据流保存到具有合流模式注册表的kafka中?星火版本为3.2.0,Scala2.12
我设法用一些难看的代码读取了来自Kafka的数据,并使用了汇合模式注册表:
val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistry, 128)
val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
val deserializer = kafkaAvroDeserializer
}
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
}
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes))```现在我想把数据写到另一个卡夫卡主题上--有没有一个简单的方法?
发布于 2022-01-29 14:07:42
您需要使用类似的丑陋代码,在Struct列(或原始类型)上使用序列化程序UDF。
有一些库可以帮助使它变得不那么丑陋- https://github.com/AbsaOSS/ABRiS
https://stackoverflow.com/questions/70905337
复制相似问题