首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何计算Apache光束中PCollection的元素数

如何计算Apache光束中PCollection的元素数
EN

Stack Overflow用户
提问于 2019-09-11 14:05:44
回答 2查看 5.1K关注 0票数 0
代码语言:javascript
复制
number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
    | 'CountGlobally' >> beam.combiners.Count.Globally() \
    | 'print' >> beam.ParDo(PrintFn())

我试着用指纹和日志来显示,但什么也没找到

代码语言:javascript
复制
class PrintFn(beam.DoFn):
    def process(self, element):
        print(element)
        logging.error(element)
        return [element]
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-09-12 14:36:11

对于批处理,您可以简单地

代码语言:javascript
复制
def print_row(element):
  print element

count_pcol = (
              lines
              | 'Count elements' >> beam.combiners.Count.Globally()
              | 'Print result' >> beam.Map(print_row)
            )

beam.combiners.Count.Globally()是一个PTransform,它使用全局组合来计数PCollection的所有元素并生成单个值。

对于,计数元素是不可能的,因为源是一个无界的pcollection,也就是说它永远不会结束。在您的情况下,CombineGlobally将继续等待输入,永远不会产生输出。

一种可能的解决方案是设置一个窗口函数和一个非默认触发器。

我已经编写了一个简单的管道,它将固定窗口中的元素划分为20秒,并对每个窗口的每个键进行计数。您可以根据您的需求更改窗口和触发器。

代码语言:javascript
复制
def form_pair(data):
  return 1, data

def print_row(element):
      print element

count_pcol = (
                p 
                | 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                | 'Form key value pair' >> beam.Map(form_pair)
                | 'Apply windowing and triggers' >> 
                                       beam.WindowInto(window.FixedWindows(20),
                                       trigger=AfterProcessingTime(5), 
                                       accumulation_mode=AccumulationMode.DISCARDING)
                | 'Count elements by key' >> beam.combiners.Count.PerKey()
                | 'Print result' >> beam.Map(print_row)
               )
票数 1
EN

Stack Overflow用户

发布于 2019-09-11 18:32:17

我发现奇怪的是,我想要计数一个无限集合的元素。我的第一感觉是,永远不要去全球窗口,因为梁等待结束在无限的集合.只是你做了个扳机。

深入了解文档,我发现了这个

设置非默认触发器。这允许全局窗口在其他条件下发出结果,因为默认的窗口行为(等待所有数据到达)永远不会发生。

我是对的,有了扳机,结局永远不会发生,它是无限的,无限的。

你是否试图跳过窗口,直接计算全球范围?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57890961

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档