首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SlidingWindows Python复制数据

SlidingWindows Python复制数据
EN

Stack Overflow用户
提问于 2018-11-14 15:56:56
回答 2查看 833关注 0票数 1

问题

每次系统收到带有滑动Windows的pubsub消息时,都会被复制

代码

代码语言:javascript
复制
 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

输出

如果我只从pub/sub发送一条消息,并试图在滑动窗口用代码结束后打印我拥有的内容:

代码语言:javascript
复制
class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果

代码语言:javascript
复制
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))之前打印消息,我只收到一次

以“图形模式”进程

代码语言:javascript
复制
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

消息X在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次。

我尝试了两个AccumulationMode值,也使用了一个trigger=AftyerWatermark,但是我无法解决这个问题。

,怎么回事?

额外

使用FixedWindows,这是我的豪宅的正确代码:

代码语言:javascript
复制
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

代码语言:javascript
复制
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
EN

回答 2

Stack Overflow用户

发布于 2018-11-14 21:13:47

将发出属于该窗口的所有元素。如果一个元素属于多个窗口,那么它将在每个窗口中发出。

只有在计划处理延迟数据/多个触发触发时,积累模式才会起作用。在这种情况下,当触发器再次触发时,丢弃模式只提供窗口中的新元素,即只发出上次触发器触发后到达同一窗口的元素,已经发出的元素不会再次发出并被丢弃。在累积模式下,每次触发时都会发出整个窗口,包括上次已经发出的旧元素和此后到达的新元素。

如果我理解你的例子,你有滑动窗口,它们有30秒的长度,它们每15秒启动一次。所以它们重叠了15秒:

代码语言:javascript
复制
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=============|       :       :
  w2:               |==============|       :
  w3:                      |===============|
  ...

因此,在您的示例中,任何元素都至少属于两个窗口(除了第一个窗口和最后一个窗口)。

在你的例子中,如果你的消息是在17:07:15和17:07:30之间发送的,它将出现在两个窗口中。

固定窗口不重叠,因此元素只能属于一个窗口:

代码语言:javascript
复制
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :             :               :
  w1:        |=============|               :
  w2:                      |===============|
  w3:                                      |====...
  ...

更多关于windows的信息:https://beam.apache.org/documentation/programming-guide/#windowing

票数 0
EN

Stack Overflow用户

发布于 2020-09-15 21:29:57

不过,在java中,我也有同样的问题。我有一个10秒的窗口和3秒的步骤。当从我订阅的mqtt主题发出一个事件时,它看起来就像我已经运行的ParDo函数,并向所有三个“构造”窗口发出第一个和唯一的事件。

X是我随机发送的事件: 2020-09-15T21:17:57.292Z

代码语言:javascript
复制
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |X============|       :       :
  w2:               |X=============|       :
  w3:                      |X==============|
  ...

甚至同样的时间戳也被分配给他们!!我一定是做错了什么事。

我使用Scala 2.12和BEAM 2.23与一个直接运行程序。

提示我在processElement函数中使用状态!按键+窗口保持状态的位置。也许那里有个窃听器?我会试着在没有状态的情况下测试它。

更新:删除了状态字段,并将单个事件分配给一个窗口。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53304174

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档