我使用的是flink(1.7) kafka客户端和Avro4s(2.0.4),我想序列化为字节数组:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {
override def serialize(element: IN): Array[Byte] = {
val str = AvroSchema[IN]
val schema: Schema = new Parser().parse(str.toString)
val out = new ByteArrayOutputStream()
val os = AvroOutputStream.data[IN].to(out).build(schema)
os.write(element)
out.close()
out.flush()
os.flush()
os.close()
out.toByteArray
}
}然而,我总是得到这样的异常:
Error:(15, 35) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.Encoder[IN]
val os = AvroOutputStream.data[IN].to(out).build(schema)和
Error:(15, 35) not enough arguments for method data: (implicit evidence$3: com.sksamuel.avro4s.Encoder[IN])com.sksamuel.avro4s.AvroOutputStreamBuilder[IN].
Unspecified value parameter evidence$3.
val os = AvroOutputStream.data[IN].to(out).build(schema)发布于 2019-05-07 22:28:39
根据代码,IN必须是Encoder类型:
object AvroOutputStream {
/**
* An [[AvroOutputStream]] that does not write the schema. Use this when
* you want the smallest messages possible at the cost of not having the schema available
* in the messages for downstream clients.
*/ def binary[T: Encoder] = new AvroOutputStreamBuilder[T](BinaryFormat)
def json[T: Encoder] = new AvroOutputStreamBuilder[T](JsonFormat)
def data[T: Encoder] = new AvroOutputStreamBuilder[T](DataFormat)
}所以它应该是这样的:
class AvroSerializationSchema[IN : Encoder] ...发布于 2019-05-22 11:55:25
在写入输出流时,不需要使用FromRecord。这是为那些想要拥有自己使用的GenericRecord的人准备的。您需要使用Encoder。
class AvroSerializationSchema[IN : SchemaFor : Encoder] extends SerializationSchema[IN] {
override def serialize(element: IN): Array[Byte] = {
val str = AvroSchema[IN]
val schema: Schema = new Parser().parse(str.toString)
val out = new ByteArrayOutputStream()
val os = AvroOutputStream.data[IN].to(out).build(schema)
os.write(element)
out.close()
out.flush()
os.flush()
os.close()
out.toByteArray
}
}https://stackoverflow.com/questions/56024318
复制相似问题