首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Flink: ProcessWindowFunction实现

Apache Flink: ProcessWindowFunction实现
EN

Stack Overflow用户
提问于 2018-11-30 06:05:20
回答 2查看 1.9K关注 0票数 6

我正在尝试使用Scala在我的Apache Flink项目中使用ProcessWindowFunction。不幸的是,我在实现Apache Flink文档中使用的基本ProcessWindowFunction时已经失败了。

这是我的代码:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.util.Collector
import scala.collection.TraversableOnce

object StreamingJob {
 def main(args: Array[String]) {

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 val eventStream = env.addSource(new OrionSource(9001))

 val processedDataStream = eventStream.flatMap(event => event.entities)
   .map(entity => (entity.id, entity.attrs("temperature").value.asInstanceOf[String]))
     .keyBy(_._1)
     .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
     .process(new MyProcessWindowFunction())

 env.execute("Socket Window NgsiEvent")
 }
}


private class MyProcessWindowFunction extends ProcessWindowFunction[(String, String), String, String, TimeWindow] {

def process(key: String, context: Context, input: Iterable[(String, String)], out: Collector[String]): Unit = {
  var count: Int = 0
  for (in <- input) {
    count = count + 1
  }
  out.collect(s"Window ${context.window} count: $count")
 }
}

我从IntelliJ得到了以下提示:

1)创建新类对象的位置如下所示:

代码语言:javascript
复制
Type mismatch, expected: ProcessWindowFunction[(String, String), NotInferedR, String, TimeWindow], actual: MyProcessWindowFunction

2)这直接在类中显示:

代码语言:javascript
复制
Class 'MyProcessWindowFunction' must either be declared abstract or implement abstract member 'process(key:KEY, context:ProcessWindowFunction.Context, iterable:Iterable<IN>, collector:Collector<OUT>):void' in 'org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction'

构建代码会显示以下错误:

代码语言:javascript
复制
Error:(51, 16) type mismatch;
found   : org.apache.flink.MyProcessWindowFunction
required: 
org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(String, String),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
  .process(new MyProcessWindowFunction())

我很感谢你的每一次帮助。

EN

回答 2

Stack Overflow用户

发布于 2018-12-06 19:04:38

在花了一些时间和另外两个人一起调试之后,我们终于找到了问题所在。

在我的代码中,我使用了以下导入:

代码语言:javascript
复制
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

但在使用Scala时,正确的导入似乎是:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
票数 7
EN

Stack Overflow用户

发布于 2019-12-02 20:09:57

代码语言:javascript
复制
//package of  ProcessWindowFunction is 
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

//The correct way to call this method
new MyProcessWindowFunction()[(String, String), String, String, TimeWindow]

//I know the official documents don't.This may be a bug
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53548308

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档