首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我会得到这样的编译错误:“找不到kstream.Consumed的隐含值”,我如何修复它?

为什么我会得到这样的编译错误:“找不到kstream.Consumed的隐含值”,我如何修复它?
EN

Stack Overflow用户
提问于 2020-04-08 23:02:05
回答 3查看 943关注 0票数 2

我们有以下依赖关系:

代码语言:javascript
复制
libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

我们使用代码生成器从AVRO模式文件中生成Scala case类。一个这样生成的case类有一个字段,就是其中一个值。在AVRO模式中,这是用type=t1,t2表示的,所以生成看起来很不错,这是一个sum类型:可以是t1类型,也可以是t2类型。

问题变成了在从topic到case类(二进制-> Avro Map -> case类)的反序列化路径上缺少什么。

基本上,我现在得到了这个错误:

代码语言:javascript
复制
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

第一个想法是kafka-streams- AVRO -serde,但这个库可能只确保AVRO Map的SerdeGenericRecord,而不是case类。因此,另一个依赖项是帮助AVRO GenericRecord到case类的映射和返回。我们还有一些手工编写的代码,可以从模式中生成case类,这似乎可以直接与spray json一起工作。

我在想,在(二进制<-> Avro GenericRecord <-> case类实例)转换中,存在一个间隙,这可能是因为在case类中有一个字段?

我现在选择一个路径来尝试创建一个SerdeUserEvent实例。因此,在我的理解中,会涉及到在UserEvent和AVRO GenericRecord之间进行转换,类似于Map,然后在AVRO记录和二进制之间进行转换-这可能包含在kafka-GenericRecord-avro-serde依赖项中,就像应该有一个SerdeGenericRecord或类似的依赖关系。

导入方面,我们要导入隐含的内容:

代码语言:javascript
复制
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-04-09 21:30:55

事实上,缺少一个导入。现在它可以编译了。以下是导入:

代码语言:javascript
复制
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
票数 4
EN

Stack Overflow用户

发布于 2020-04-09 11:40:35

您是否导入了相应的包?

代码语言:javascript
复制
import org.apache.kafka.streams.scala.ImplicitConversions._

请参阅https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl

票数 0
EN

Stack Overflow用户

发布于 2020-08-25 12:14:34

对我来说,我必须更好地遵循the directions,并添加一个隐式的serde实现。他们在链接中的示例如下所示:

代码语言:javascript
复制
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde

有关更完整的示例,请参阅scala tests for their avro lib

代码语言:javascript
复制
    // Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
    implicit val genericAvroSerde: Serde[GenericRecord] = {
      val gas = new GenericAvroSerde
      val isKeySerde: Boolean = false
      gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
      gas
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61103804

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档