首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark 3.2.0结构化流保存数据到Kafka与合流模式注册表

Spark 3.2.0结构化流保存数据到Kafka与合流模式注册表
EN

Stack Overflow用户
提问于 2022-01-29 12:24:12
回答 1查看 194关注 0票数 0

有什么简单的方法吗?如何将星星之火结构化的数据流保存到具有合流模式注册表的kafka中?星火版本为3.2.0,Scala2.12

我设法用一些难看的代码读取了来自Kafka的数据,并使用了汇合模式注册表:

代码语言:javascript
复制
  val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistry, 128)
  val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
  val deserializer = kafkaAvroDeserializer
}

class AvroDeserializer extends AbstractKafkaAvroDeserializer {
  def this(client: SchemaRegistryClient) {
    this()
    this.schemaRegistry = client
  }

  override def deserialize(bytes: Array[Byte]): String = {
    val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

spark.udf.register("deserialize", (bytes: Array[Byte]) =>
  DeserializerWrapper.deserializer.deserialize(bytes))```

现在我想把数据写到另一个卡夫卡主题上--有没有一个简单的方法?

EN

回答 1

Stack Overflow用户

发布于 2022-01-29 14:07:42

您需要使用类似的丑陋代码,在Struct列(或原始类型)上使用序列化程序UDF。

有一些库可以帮助使它变得不那么丑陋- https://github.com/AbsaOSS/ABRiS

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70905337

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档