首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在beam.io.WriteToBigQuery中调用beam.DoFn

在beam.io.WriteToBigQuery中调用beam.DoFn
EN

Stack Overflow用户
提问于 2020-05-27 21:46:47
回答 2查看 1.8K关注 0票数 1

我创建了一个带有一些参数的数据流模板。当我将数据写入BigQuery时,我希望使用这些参数来确定应该写入哪个表。我尝试按照下面的链接中的建议在一个WriteToBigQuery中调用ParDo。

如何使用Apache中的运行时值提供程序写入大查询?

该管道成功运行,但它没有创建数据或将数据加载到BigQuery。知道有什么问题吗?

代码语言:javascript
复制
def run():
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

  with beam.Pipeline(options=pipeline_options) as p:
    custom_options = pipeline_options.view_as(CustomOptions)

    _ = (
      p
      | beam.Create([None])
      | 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
      | 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
      | 'Transform record' >> beam.Map(transform_record)
      | 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
    )

if __name__ == '__main__':
  run()
代码语言:javascript
复制
class CustomOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--year', type=int)
    parser.add_value_provider_argument('--secret_name', type=str)
代码语言:javascript
复制
class WritePlanDataToBigQuery(beam.DoFn):
  def __init__(self, year_vp):
    self._year_vp = year_vp

  def process(self, element):
    year = self._year_vp.get()

    table = f's4c.plan_data_{year}'
    schema = {
      'fields': [ ...some fields properties ]
    }

    beam.io.WriteToBigQuery(
      table=table,
      schema=schema,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
      method=beam.io.WriteToBigQuery.Method.FILE_LOADS
    )
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-28 22:31:24

您已经在您的PTransform方法的process方法中实例化了beam.io.gcp.bigquery.WriteToBigQuery。这里有几个问题:

  • 对输入process的每个元素调用PCollection方法。它不用于生成管线图。这种动态构造图的方法将无法工作。
  • 一旦您将它从DoFn中移出,您需要将PTransform beam.io.gcp.bigquery.WriteToBigQuery应用到PCollection中,这样才能产生任何效果。请参阅束流pydoc束流教程文档

若要为表名创建派生值提供程序,需要“嵌套”值提供程序。不幸的是,这是Python不支持。不过,您可以直接使用值提供程序选项。

作为一个高级选项,您可能对尝试"flex模板“感兴趣,它实际上将整个程序打包为一个停靠映像,并使用参数执行它。

票数 1
EN

Stack Overflow用户

发布于 2020-09-29 06:39:56

如果目标是代码接受参数,而不是表路径的硬编码字符串,下面是实现该目标的一种方法:

  • 将表参数添加为CustomOptions
  • 在run函数中添加CustomOptions参数作为默认字符串
代码语言:javascript
复制
...

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--gcs_input_file_path',
            type=str,
            help='GCS Input File Path'
        )
        parser.add_value_provider_argument(
            '--project_id',
            type=str,
            help='GCP ProjectID'
        )
        parser.add_value_provider_argument(
            '--dataset',
            type=str,
            help='BigQuery DataSet Name'
        )
        parser.add_value_provider_argument(
            '--table',
            type=str,
            help='BigQuery Table Name'
        )

def run(argv=None):

    pipeline_option = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_option)
    custom_options = pipeline_option.view_as(CustomOptions)
    pipeline_option.view_as(SetupOptions).save_main_session = True
    pipeline_option.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--gcp_project_id',
        type=str,
        help='GCP ProjectID',
        default=str(custom_options.project_id)
    )
    parser.add_argument(
        '--dataset',
        type=str,
        help='BigQuery DataSet Name',
        default=str(custom_options.dataset)
    )
    parser.add_argument(
        '--table',
        type=str,
        help='BigQuery Table Name',
        default=str(custom_options.table)
    )

    static_options, _ = parser.parse_known_args(argv)
    path = static_options.gcp_project_id + ":" + static_options.dataset + "." + static_options.table

    data = (
            pipeline
            | "Read from GCS Bucket" >>
            beam.io.textio.ReadFromText(custom_options.gcs_input_file_path)
            | "Parse Text File" >>
            beam.ParDo(Split())
            | 'WriteToBigQuery' >>
            beam.io.WriteToBigQuery(
                path,
                schema=Schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
    )

    result = pipeline.run()
    result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
  • 在shell文件中的管道构建时传递表路径
代码语言:javascript
复制
python template.py \
  --dataset dataset_name \
  --table table_name \
  --project project_name \
  --runner DataFlowRunner \
  --region region_name \
  --staging_location gs://bucket_name/staging \
  --temp_location gs://bucket_name/temp \
  --template_location gs://bucket_name/templates/template_name
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62053420

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档