首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法在数据流中查看beam.combiners.Count.PerElement()的输出

无法在数据流中查看beam.combiners.Count.PerElement()的输出
EN

Stack Overflow用户
提问于 2020-03-26 23:14:06
回答 1查看 215关注 0票数 0

我有一个Pub/Sub脚本,发布男性名字如下:

代码语言:javascript
复制
from google.cloud import pubsub_v1
import names

project_id = "Your-Project-Name"
topic_name = "Your-Topic-Name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

while True:
    data = names.get_first_name(gender='male') #u"Message number {}".format(n)
    data = data.encode("utf-8")
    publisher.publish(topic_path, data=data)

然后,我有一个Dataflow,它从附加到主题的订阅中读取,然后按如下方式计算管道的每个元素:

代码语言:javascript
复制
import logging,re,os
import apache_beam as beam
from apache_beam.options.pipeline_options import  PipelineOptions

root = logging.getLogger()
root.setLevel(logging.INFO)

p = beam.Pipeline(options=PipelineOptions())
x = (
 p
 | beam.io.ReadFromPubSub(topic=None, subscription="projects/YOUR-PROJECT-NAME/subscriptions/YOUR-SUBSCRIPTION-NAME").with_output_types(bytes)
 | 'Decode_UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
 | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | 'CountingElem' >> beam.combiners.Count.PerElement()
 | 'FormatOutput' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
 | 'Printing2Log' >> beam.Map(lambda k: logging.info(k)))

result = p.run()
result.wait_until_finish()

问题是:我无法从管道的最后三个步骤中得到任何输出,而我可以看到数据从管道的前三个步骤中流动--这意味着没有任何记录。

我期望输出是这样的:

代码语言:javascript
复制
Peter: 2
Glen: 1
Alex: 1
Ryan: 2

我已经感谢你帮助我了

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-28 00:16:18

考虑到这是一个流管道,您需要适当地设置窗口/触发才能使管道工作。见下文。https://beam.apache.org/documentation/programming-guide/#windowing

更具体而言:

警告:Beam的默认窗口行为是将PCollection的所有元素分配给单个全局窗口,并丢弃后期数据,即使对于无界PCollections也是如此。在对无界的GroupByKey使用分组转换(如PCollection )之前,必须至少执行以下操作之一:

beam.combiners.Count.PerElement()中包含一个GroupByKey

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60877626

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档