首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache维基百科用Scala编辑分析

Apache维基百科用Scala编辑分析
EN

Stack Overflow用户
提问于 2016-12-29 16:49:49
回答 1查看 450关注 0票数 0

我正在尝试从quickstart.html将Apache教程中的维基百科编辑流分析重写到Scala

本教程中的代码是

代码语言:javascript
复制
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

    result.print();

    see.execute();
  }
}

下面是我在scala上的尝试

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time


object WikipediaAnalytics extends App{

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val edits = env.addSource(new WikipediaEditsSource());

  val keyedEdits = edits.keyBy(event => event.getUser)

  val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
    (we.getUser, t._2 + we.getByteDiff))

}

这或多或少是一个单词到scala的转换,基于它,val result的类型应该是DataStream[(String, Long)],但是在fold()之后推断的实际类型并不接近。

请帮助识别scala代码的问题所在。

EDIT1:使用fold[R]的运行原理图进行以下更改,该类型现在确认为预期的类型,但仍然无法确定原因

代码语言:javascript
复制
  val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))

  val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
    (we.getUser, t._2 + we.getByteDiff))
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-29 22:07:22

问题似乎在于折叠,你必须在你的累加器initialValue之后有一个结束托架。当您修复它时,代码将无法编译,因为它没有可用于TypeInformation的WikipediaEditEvent。解决这个问题的最简单方法是导入更多的flink scala。完整的例子见下文:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time

object WikipediaAnalytics extends App {
  val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits = see.addSource(new WikipediaEditsSource())
  val userEditsVolume: DataStream[(String, Int)] = edits
    .keyBy(_.getUser)
    .timeWindow(Time.seconds(5))
    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
  userEditsVolume.print()
  see.execute("Wikipedia User Edit Volume")
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41383878

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档