我创建了一个带有一些参数的数据流模板。当我将数据写入BigQuery时,我希望使用这些参数来确定应该写入哪个表。我尝试按照下面的链接中的建议在一个WriteToBigQuery中调用ParDo。
该管道成功运行,但它没有创建数据或将数据加载到BigQuery。知道有什么问题吗?
def run():
pipeline_options = PipelineOptions()
pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
with beam.Pipeline(options=pipeline_options) as p:
custom_options = pipeline_options.view_as(CustomOptions)
_ = (
p
| beam.Create([None])
| 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
| 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
| 'Transform record' >> beam.Map(transform_record)
| 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
)
if __name__ == '__main__':
run()class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--year', type=int)
parser.add_value_provider_argument('--secret_name', type=str)class WritePlanDataToBigQuery(beam.DoFn):
def __init__(self, year_vp):
self._year_vp = year_vp
def process(self, element):
year = self._year_vp.get()
table = f's4c.plan_data_{year}'
schema = {
'fields': [ ...some fields properties ]
}
beam.io.WriteToBigQuery(
table=table,
schema=schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS
)发布于 2020-05-28 22:31:24
您已经在您的PTransform方法的process方法中实例化了beam.io.gcp.bigquery.WriteToBigQuery。这里有几个问题:
process的每个元素调用PCollection方法。它不用于生成管线图。这种动态构造图的方法将无法工作。DoFn中移出,您需要将PTransform beam.io.gcp.bigquery.WriteToBigQuery应用到PCollection中,这样才能产生任何效果。请参阅束流pydoc或束流教程文档。若要为表名创建派生值提供程序,需要“嵌套”值提供程序。不幸的是,这是Python不支持。不过,您可以直接使用值提供程序选项。
作为一个高级选项,您可能对尝试"flex模板“感兴趣,它实际上将整个程序打包为一个停靠映像,并使用参数执行它。
发布于 2020-09-29 06:39:56
如果目标是代码接受参数,而不是表路径的硬编码字符串,下面是实现该目标的一种方法:
...
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--gcs_input_file_path',
type=str,
help='GCS Input File Path'
)
parser.add_value_provider_argument(
'--project_id',
type=str,
help='GCP ProjectID'
)
parser.add_value_provider_argument(
'--dataset',
type=str,
help='BigQuery DataSet Name'
)
parser.add_value_provider_argument(
'--table',
type=str,
help='BigQuery Table Name'
)
def run(argv=None):
pipeline_option = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_option)
custom_options = pipeline_option.view_as(CustomOptions)
pipeline_option.view_as(SetupOptions).save_main_session = True
pipeline_option.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
parser = argparse.ArgumentParser()
parser.add_argument(
'--gcp_project_id',
type=str,
help='GCP ProjectID',
default=str(custom_options.project_id)
)
parser.add_argument(
'--dataset',
type=str,
help='BigQuery DataSet Name',
default=str(custom_options.dataset)
)
parser.add_argument(
'--table',
type=str,
help='BigQuery Table Name',
default=str(custom_options.table)
)
static_options, _ = parser.parse_known_args(argv)
path = static_options.gcp_project_id + ":" + static_options.dataset + "." + static_options.table
data = (
pipeline
| "Read from GCS Bucket" >>
beam.io.textio.ReadFromText(custom_options.gcs_input_file_path)
| "Parse Text File" >>
beam.ParDo(Split())
| 'WriteToBigQuery' >>
beam.io.WriteToBigQuery(
path,
schema=Schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
result = pipeline.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()python template.py \
--dataset dataset_name \
--table table_name \
--project project_name \
--runner DataFlowRunner \
--region region_name \
--staging_location gs://bucket_name/staging \
--temp_location gs://bucket_name/temp \
--template_location gs://bucket_name/templates/template_namehttps://stackoverflow.com/questions/62053420
复制相似问题