问题
每次系统收到带有滑动Windows的pubsub消息时,都会被复制
代码
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())输出
如果我只从pub/sub发送一条消息,并试图在滑动窗口用代码结束后打印我拥有的内容:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))之前打印消息,我只收到一次
以“图形模式”进程:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...消息X在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次。
我尝试了两个AccumulationMode值,也使用了一个trigger=AftyerWatermark,但是我无法解决这个问题。
,怎么回事?
额外
使用FixedWindows,这是我的豪宅的正确代码:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())或
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())发布于 2018-11-14 21:13:47
将发出属于该窗口的所有元素。如果一个元素属于多个窗口,那么它将在每个窗口中发出。
只有在计划处理延迟数据/多个触发触发时,积累模式才会起作用。在这种情况下,当触发器再次触发时,丢弃模式只提供窗口中的新元素,即只发出上次触发器触发后到达同一窗口的元素,已经发出的元素不会再次发出并被丢弃。在累积模式下,每次触发时都会发出整个窗口,包括上次已经发出的旧元素和此后到达的新元素。
如果我理解你的例子,你有滑动窗口,它们有30秒的长度,它们每15秒启动一次。所以它们重叠了15秒:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=============| : :
w2: |==============| :
w3: |===============|
...因此,在您的示例中,任何元素都至少属于两个窗口(除了第一个窗口和最后一个窗口)。
在你的例子中,如果你的消息是在17:07:15和17:07:30之间发送的,它将出现在两个窗口中。
固定窗口不重叠,因此元素只能属于一个窗口:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : :
w1: |=============| :
w2: |===============|
w3: |====...
...更多关于windows的信息:https://beam.apache.org/documentation/programming-guide/#windowing
发布于 2020-09-15 21:29:57
不过,在java中,我也有同样的问题。我有一个10秒的窗口和3秒的步骤。当从我订阅的mqtt主题发出一个事件时,它看起来就像我已经运行的ParDo函数,并向所有三个“构造”窗口发出第一个和唯一的事件。
X是我随机发送的事件: 2020-09-15T21:17:57.292Z
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |X============| : :
w2: |X=============| :
w3: |X==============|
...甚至同样的时间戳也被分配给他们!!我一定是做错了什么事。
我使用Scala 2.12和BEAM 2.23与一个直接运行程序。
提示我在processElement函数中使用状态!按键+窗口保持状态的位置。也许那里有个窃听器?我会试着在没有状态的情况下测试它。
更新:删除了状态字段,并将单个事件分配给一个窗口。
https://stackoverflow.com/questions/53304174
复制相似问题