首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用替代项聚合的重载方法值

使用替代项聚合的重载方法值
EN

Stack Overflow用户
提问于 2018-01-08 14:52:04
回答 1查看 559关注 0票数 1

我有以下不能编译的函数:

代码语言:javascript
复制
  private def save(pea: KStream[String, String]): Unit = {
    pea
      .groupByKey()
      .aggregate(() => """{folder: ""}""",
        (_: String, _: String, value: String) => value,
        EventStoreTopology.Store)
  }

错误消息为:

代码语言:javascript
复制
[error]   [VR](x$1: org.apache.kafka.streams.kstream.Initializer[VR], x$2: org.apache.kafka.streams.kstream.Aggregator[_ >: String, _ >: String, VR], x$3: org.apache.kafka.streams.processor.StateStoreSupplier[org.apache.kafka.streams.state.KeyValueStore[_, _]])org.apache.kafka.streams.kstream.KTable[String,VR] <and>
[error]   [VR](x$1: org.apache.kafka.streams.kstream.Initializer[VR], x$2: org.apache.kafka.streams.kstream.Aggregator[_ >: String, _ >: String, VR], x$3: org.apache.kafka.common.serialization.Serde[VR])org.apache.kafka.streams.kstream.KTable[String,VR] <and>
[error]   [VR](x$1: org.apache.kafka.streams.kstream.Initializer[VR], x$2: org.apache.kafka.streams.kstream.Aggregator[_ >: String, _ >: String, VR], x$3: org.apache.kafka.streams.kstream.Materialized[String,VR,org.apache.kafka.streams.state.KeyValueStore[org.apache.kafka.common.utils.Bytes,Array[Byte]]])org.apache.kafka.streams.kstream.KTable[String,VR]
[error]  cannot be applied to (() => String, (String, String, String) => String, io.khinkali.eventstore.EventStoreTopology.Persistent)
[error]       .aggregate(() => """{folder: ""}""",
[error]        ^
[error] one error found
[error] (eventstore/compile:compileIncremental) Compilation failed 

aggregate签名为:

代码语言:javascript
复制
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

并且EventStoreTopology.Store被定义为:

代码语言:javascript
复制
object EventStoreTopology {

  type Persistent = Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]]
  val StoreName: String = "EventStore"
  val Store: Persistent = Materialized.as(StoreName)

}

我做错了什么?

EN

回答 1

Stack Overflow用户

发布于 2018-01-08 16:07:28

编译器需要一些帮助来推断aggregator参数的正确类型。

要使其编译,您可以尝试:

代码语言:javascript
复制
val store: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] = ???

private def save(pea: KStream[String, String]): Unit = {
  val aggregator: Aggregator[String, String, String] = (_, _, value: String) => value
  pea
    .groupByKey()
    .aggregate(() => """{folder: ""}""",
      aggregator,
      store)
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48145527

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档