首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink 1.5 DataStream:重复过滤

Flink 1.5 DataStream:重复过滤
EN

Stack Overflow用户
提问于 2018-07-01 23:02:04
回答 1查看 621关注 0票数 0

我有一个未绑定的DataStream,它代表了社交网络中的友谊。这些友谊可以是双向的,因此在流中出现两次。

数据的结构是: timestamp|user1|user2。例如:

代码语言:javascript
复制
2010-03-09T02:51:11.571+0000|143|1219
2010-03-09T06:08:51.942+0000|1242|4624
2010-03-09T08:24:03.773+0000|2191|4986
2010-03-09T09:37:09.788+0000|459|4644

我想删除双向友谊,以便只计算一次。在实践中,我希望过滤器重复。我找到了一个解决方案here

我的FilterFunction看起来像这样:

代码语言:javascript
复制
   def filter(ds: DataStream[String]): DataStream[(String, String, String)] = {

    val res = data.
      mapWith(line => {
        val str = line.split("\\|")
        if (str(1).toLong > str(2).toLong)
          (str(0), str(1), str(2))
        else
           (str(0), str(2), str(1))
      })
       .keyBy(tuple => (tuple._2, tuple._3))
      .flatMap(new FilterFunction())

    res
  }

我将我的RichFlatMapFunction实现为:

代码语言:javascript
复制
class FilterFunction extends RichFlatMapFunction[(String, String, String), (String, String, String)] {

  private var seen: ValueState[Boolean] = _

  override def flatMap(value: (String, String, String), out: 
Collector[(String, String, String)]): Unit = {

     if (!seen.value() || seen.value() == null) {
       seen.update(true)
       out.collect(value)
     }
   }

  override def open(parameters: Configuration): Unit = {
     seen = getRuntimeContext.getState(
       new ValueStateDescriptor("seen", classOf[Boolean])
     )
   }
}

然而,当我打印时,我得到的结果是随机的。我尝试在1年的时间窗口内执行计数:

代码语言:javascript
复制
val da1 = filter(data)
  .mapWith(tuple => Parser.parseUserConnection(tuple).get)
  .assignAscendingTimestamps(connection => connection.timestamp.getMillis)
  .mapWith(connection => (connection, 1))
  .timeWindowAll(Time.days(365))
  .sum(1)
  .mapWith(tuple => tuple._2)
  .print()

我的控制台第一次打印:

代码语言:javascript
复制
1> 33735

然后:

代码语言:javascript
复制
1> 10658
2> 33735

对于随后的执行,不同的结果(只有33735似乎是稳定的)。我不能理解这种奇怪的行为。

EN

回答 1

Stack Overflow用户

发布于 2018-07-02 04:32:42

很难追随你所发现的令人惊讶的东西。但调试这类应用程序的一般技术是打印流水线不同阶段的结果,看看结果在什么时候变得奇怪。或者在IDE中调试作业并逐步执行它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51124439

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档