查看下面的代码块,我没有看到expand方法曾经被下游调用过。
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
lower_counts = (words
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement())它会在创建words实例时自动触发吗?(在这种情况下,我试图在Apache光束的上下文中理解Python的一般情况)
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')发布于 2021-02-12 09:29:08
此类继承自beam.PTransform类(未显示)。
为了让它支持自定义运算符,在他们使用的风格是在混乱下面,它必须做很多事情。语言本身调用的唯一方法包含在__前缀和后缀中,比如用于实现带有| (__or__)和>> (__rshift__)运算符的行为的方法(请参阅Data Model文档中列出的完整魔术方法)。这个超类必须实现这些,并且它们中的任何一个都可能调用expand方法。
发布于 2021-02-14 07:33:39
只要编写input | transform,就应该调用转换的expand()方法。我很惊讶您在编写words = p | 'read' >> ReadWordsFromText(...)时没有看到它被调用(它为我做了;试着用那个方法打印一些东西)。
然而,请注意,实际执行流水线(例如,对结果数据执行读取和后续FlatMap)是延迟的。您需要执行p.run(),或者,如果您试图以交互方式使用东西,则需要使用interactive_beam.collect()来触发执行。
https://stackoverflow.com/questions/66164569
复制相似问题