我很好奇在这里是否还有其他人在运行程序中遇到过类似的问题,如下所述。(我现在还不能去CloudRunner )
正在执行的查询返回的行数略少于1 800万行。如果我为查询添加了一个限制(例如: 10000),那么datafow就会像预期的那样工作。不包括在代码片段中的是WriteToBleve接收器,它是支持写入白热化索引的自定义接收器。
正在使用的python是2.2.0,但我准备启动一些java.
我在运行管道时看到的最后一条日志消息是:
警告:root:Dataset my-project:temp_dataset_7708fbe7e7694cd49b8b0de07af2470b不存在,因此我们将使用location=None作为临时创建它
数据集是正确创建和填充的,当我调试到管道中时,我可以看到结果正在迭代,但这个管道本身似乎从未达到写入阶段。
options = {
"project": "my-project",
"staging_location": "gs://my-project/staging",
"temp_location": "gs://my-project/temp",
"runner": "DirectRunner"
}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline(options=pipeline_options)
p | 'Read From Bigquery' >> beam.io.Read(beam.io.BigQuerySource(
query=self.build_query(),
use_standard_sql=True,
validate=True,
flatten_results=False,
)) | 'Write to Bleve' >> WriteToBleve()
result = p.run()
result.wait_until_finish()发布于 2018-02-14 00:14:24
https://stackoverflow.com/questions/48775215
复制相似问题