我们有以下依赖关系:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"我们使用代码生成器从AVRO模式文件中生成Scala case类。一个这样生成的case类有一个字段,就是其中一个值。在AVRO模式中,这是用type=t1,t2表示的,所以生成看起来很不错,这是一个sum类型:可以是t1类型,也可以是t2类型。
问题变成了在从topic到case类(二进制-> Avro Map -> case类)的反序列化路径上缺少什么。
基本上,我现在得到了这个错误:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")第一个想法是kafka-streams- AVRO -serde,但这个库可能只确保AVRO Map的SerdeGenericRecord,而不是case类。因此,另一个依赖项是帮助AVRO GenericRecord到case类的映射和返回。我们还有一些手工编写的代码,可以从模式中生成case类,这似乎可以直接与spray json一起工作。
我在想,在(二进制<-> Avro GenericRecord <-> case类实例)转换中,存在一个间隙,这可能是因为在case类中有一个字段?
我现在选择一个路径来尝试创建一个SerdeUserEvent实例。因此,在我的理解中,会涉及到在UserEvent和AVRO GenericRecord之间进行转换,类似于Map,然后在AVRO记录和二进制之间进行转换-这可能包含在kafka-GenericRecord-avro-serde依赖项中,就像应该有一个SerdeGenericRecord或类似的依赖关系。
导入方面,我们要导入隐含的内容:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed发布于 2020-04-09 21:30:55
事实上,缺少一个导入。现在它可以编译了。以下是导入:
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._发布于 2020-04-09 11:40:35
您是否导入了相应的包?
import org.apache.kafka.streams.scala.ImplicitConversions._请参阅https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl
发布于 2020-08-25 12:14:34
对我来说,我必须更好地遵循the directions,并添加一个隐式的serde实现。他们在链接中的示例如下所示:
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde有关更完整的示例,请参阅scala tests for their avro lib
// Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
implicit val genericAvroSerde: Serde[GenericRecord] = {
val gas = new GenericAvroSerde
val isKeySerde: Boolean = false
gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
gas
}https://stackoverflow.com/questions/61103804
复制相似问题