首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >尝试使用avro4s定义序列化程序,但遇到缺少的隐式错误

尝试使用avro4s定义序列化程序,但遇到缺少的隐式错误
EN

Stack Overflow用户
提问于 2019-05-07 21:58:43
回答 2查看 453关注 0票数 0

我使用的是flink(1.7) kafka客户端和Avro4s(2.0.4),我想序列化为字节数组:

代码语言:javascript
复制
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {
  override def serialize(element: IN): Array[Byte] = {
    val str = AvroSchema[IN]
    val schema: Schema = new Parser().parse(str.toString)
    val out = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[IN].to(out).build(schema)
    os.write(element)
    out.close()
    out.flush()
    os.flush()
    os.close()
    out.toByteArray
  }
}

然而,我总是得到这样的异常:

代码语言:javascript
复制
Error:(15, 35) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.Encoder[IN]
    val os = AvroOutputStream.data[IN].to(out).build(schema)

代码语言:javascript
复制
Error:(15, 35) not enough arguments for method data: (implicit evidence$3: com.sksamuel.avro4s.Encoder[IN])com.sksamuel.avro4s.AvroOutputStreamBuilder[IN].
Unspecified value parameter evidence$3.
    val os = AvroOutputStream.data[IN].to(out).build(schema)
EN

回答 2

Stack Overflow用户

发布于 2019-05-07 22:28:39

根据代码,IN必须是Encoder类型:

代码语言:javascript
复制
object AvroOutputStream {

  /**
    * An [[AvroOutputStream]] that does not write the schema. Use this when
    * you want the smallest messages possible at the cost of not having the schema available
    * in the messages for downstream clients.
    */   def binary[T: Encoder] = new AvroOutputStreamBuilder[T](BinaryFormat)

  def json[T: Encoder] = new AvroOutputStreamBuilder[T](JsonFormat)

  def data[T: Encoder] = new AvroOutputStreamBuilder[T](DataFormat)
}

所以它应该是这样的:

代码语言:javascript
复制
class AvroSerializationSchema[IN : Encoder] ...
票数 1
EN

Stack Overflow用户

发布于 2019-05-22 11:55:25

在写入输出流时,不需要使用FromRecord。这是为那些想要拥有自己使用的GenericRecord的人准备的。您需要使用Encoder

代码语言:javascript
复制
class AvroSerializationSchema[IN : SchemaFor : Encoder] extends SerializationSchema[IN] {
  override def serialize(element: IN): Array[Byte] = {
    val str = AvroSchema[IN]
    val schema: Schema = new Parser().parse(str.toString)
    val out = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[IN].to(out).build(schema)
    os.write(element)
    out.close()
    out.flush()
    os.flush()
    os.close()
    out.toByteArray
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56024318

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档