我有一个简单的案例课
case class KafkaContainer(key: String, payload: AnyRef)然后我想通过制片人把这个发给卡夫卡主题
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))这件事很好用
然后我试着用这个
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)这与有效载荷中的任何类都能很好地工作。
但!如果它是AnyRef。消费者代码失败
错误:(38,96)找不到com.sksamuel.avro4s.FromRecordtest.messages.KafkaContainer val输入类型的证据参数的隐式值: AvroBinaryInputStreamKafkaContainer = AvroInputStream.binaryKafkaContainer 错误:(38,96)没有足够的参数用于方法二进制:(隐证据$21: com.sksamuel.avro4s.SchemaFortest.messages.KafkaContainer,隐式证据$22: com.sksamuel.avro4s.FromRecordtest.messages.KafkaContainer)com.sksamuel.avro4s.AvroBinaryInputStreamtest.messages.KafkaContainer.未指定的值参数证据$22。val输入: AvroBinaryInputStreamKafkaContainer = AvroInputStream.binaryKafkaContainer
如果我声明请求
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]它无法用
错误:(58,71)找不到com.sksamuel.avro4s.FromValueObject隐式fromRecord: FromRecordKafkaContainer = FromRecordKafkaContainer类型的延迟隐值 错误:(58,71)方法lazyConverter:(隐式fromValue: fromValue)的参数不足未指定的值参数fromValue。隐式val fromRecord: FromRecordKafkaContainer = FromRecordKafkaContainer
如果添加每一个隐含的编译器是必需的
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]编译失败并出现错误
错误:(58,69)宏展开期间的异常: java.lang.IllegalArgumentException: requirement :需要一个case类,但对象不在scala.Predef$.require(Predef.scala:277)处的scala.Predef$.require,隐式val fromRecordObject: FromRecordObject = FromRecordObject
但是,如果我将AnyRef替换为某些类--不需要隐式的--那么一切都会恢复正常。
发布于 2019-01-23 11:41:16
我在使用任何数据类型时也有类似的问题。您必须指定该成员变量的哪些类型是有效的,因为Any或AnyRef可以是任何类型。然后使用任何一种或无形状的(也见Github文件)。就我的情况而言,它可以是字符串、长、双或空,所以使用无形状的可以:
case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])这将转换为AVRO中的联合类型:
{
"name" : "value",
"type" : [ "null", "string", "long", "double" ]
}https://stackoverflow.com/questions/51636647
复制相似问题