number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
| 'CountGlobally' >> beam.combiners.Count.Globally() \
| 'print' >> beam.ParDo(PrintFn())我试着用指纹和日志来显示,但什么也没找到
class PrintFn(beam.DoFn):
def process(self, element):
print(element)
logging.error(element)
return [element]发布于 2019-09-12 14:36:11
对于批处理,您可以简单地
def print_row(element):
print element
count_pcol = (
lines
| 'Count elements' >> beam.combiners.Count.Globally()
| 'Print result' >> beam.Map(print_row)
)beam.combiners.Count.Globally()是一个PTransform,它使用全局组合来计数PCollection的所有元素并生成单个值。
对于流,计数元素是不可能的,因为源是一个无界的pcollection,也就是说它永远不会结束。在您的情况下,CombineGlobally将继续等待输入,永远不会产生输出。
一种可能的解决方案是设置一个窗口函数和一个非默认触发器。
我已经编写了一个简单的管道,它将固定窗口中的元素划分为20秒,并对每个窗口的每个键进行计数。您可以根据您的需求更改窗口和触发器。
def form_pair(data):
return 1, data
def print_row(element):
print element
count_pcol = (
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Form key value pair' >> beam.Map(form_pair)
| 'Apply windowing and triggers' >>
beam.WindowInto(window.FixedWindows(20),
trigger=AfterProcessingTime(5),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Count elements by key' >> beam.combiners.Count.PerKey()
| 'Print result' >> beam.Map(print_row)
)发布于 2019-09-11 18:32:17
我发现奇怪的是,我想要计数一个无限集合的元素。我的第一感觉是,永远不要去全球窗口,因为梁等待结束在无限的集合.只是你做了个扳机。
深入了解文档,我发现了这个
设置非默认触发器。这允许全局窗口在其他条件下发出结果,因为默认的窗口行为(等待所有数据到达)永远不会发生。
我是对的,有了扳机,结局永远不会发生,它是无限的,无限的。
你是否试图跳过窗口,直接计算全球范围?
https://stackoverflow.com/questions/57890961
复制相似问题