我正在使用一个梁管道来计数频率的电话号码流数据。我使用滑动窗口,每5分钟重复一次,总周期为15分钟,因此,对于某些输入,当输入落在多个窗口中时,我将得到多个输出。
在计算了出现的次数之后,我希望找到输入特性的平均值。输入是元组,如下所示:
('phone_number', '123')
('phone_number', '456')
('phone_number', '456')
('phone_number', '456')管道的第一部分是计算每个数字的频率:
| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()正确的计算是正确的,我可以计算每个数字的频率,得到3个结果,因为每个时间段有3个滑动窗口(在我们的例子中,456个呼叫中有2个位于同一个窗口,第三个位于不同的窗口):
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)现在,我想在计算的所有窗口值上找到每个电话号码的平均值,即:
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 1.5)我的下一步是
| 'Find Means' >> beam.combiners.Mean.PerKey()但这只会让我:
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)有没有办法对上一次的结果进行另一次beam.combiners计算?
发布于 2020-03-18 18:50:45
beam.combiners.Mean.PerKey()给出的输出不正确,原因是组合器给出了为每个key+window计算的单个值。
然而,这里还有更多。流处理中窗口的原因是为了确保在生成结果之前输入是有限制的。也就是说,流管道的输入通常是无界的,这意味着除非管道被终止,否则它们永远不会停止接收数据。因此,不可能在所有窗口中计算一个值,因为您需要永远等待。
在我看来,你似乎是在试图计算“15分钟窗口中出现的电话号码的平均次数,当比较所有可能的滑动15分钟窗口时,通过滑动它5分钟”。如果不是这样的话,请澄清以帮助我理解
由于我们需要以某种方式绑定计算,所以可能会周期性地输出结果,即每个窗口,并不断输出新的结果,直到管道结束。这在StatefulDoFn中应该是可能的。
为此,我建议:
就像这样:
class ComputeMeanStatefulDoFn(DoFn):
TOTAL_STATE = CombiningStateSpec('total', sum)
COUNT_STATE = CombiningStateSpec('count', sum)
def process(self, element,
total=DoFn.StateParam(TOTAL_STATE),
count=DoFn.StateParam(COUNT_STATE)):
key_phone_number, value_window_count = element
current_count = count.read() + 1
current_total = total.read() + value_window_count
mean = current_total / current_count
# You can emit every N results to reduce the volume
# but please make sure to at least emit the first M << N results
yield (key_phone_number, mean)
total.add(value_window_count)
count.add(1)
| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()
| 'window_globally' >> beam.WindowInto(window.GlobalWindows)
| 'compute_mean_across_windows' >> beam.ParDo(ComputeMeanStatefulDoFn)本质上,这里发生的事情是,和被存储到persistance/disk中,我们每次新元素到达全局窗口时都重新计算一个新的平均值。
注意:您需要处理多次发出相同键的更新平均值。也就是说,您可能希望覆盖包含结果的BigQuery表中的一行。
注意:根据您要计算的语义,您可能希望从SlidingWindows函数中发出空窗口,以便将它们包含在下游的均值计算中。
注意:您不能在这里使用Combine.globally,因为这不会终止,这是因为流管道中的输入是无界的。我相信,如果你试图启动这样的管道,这可能会出错。
https://stackoverflow.com/questions/60738647
复制相似问题