我正在运行一个GCP数据流管道,将其作为模板部署在GCP上。我需要在管道中运行一条BigQuery read语句。它的条件参数需要动态传递。我该怎么做呢?
我想要运行的查询类似于
select * from tabel_name where field1=[dynamic_value]以下是运行该查询的示例代码
import apache_beam as beam
query_string = "select * from tabel_name where field1=[dynamic_value]"
options = {'project': PROJECT_ID, 'runner': 'Direct', "temp_location": "gs://my_bucket/temp",'region': "australia-southeast1", }
pipeline_options = beam.pipeline.PipelineOptions(sys.argv,**options)
custom_options = pipeline_options.view_as(MyOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
sample = (
pipeline
| 'QueryTable' >> beam.io.ReadFromBigQuery(query=query_string, use_standard_sql=False)
| "Processing" >> beam.ParDo(MyPreprocessor())
| beam.Map(print))我需要从命令行选项--dynamic_value传递dynamic_value。如果它不是模板,我可以将它作为sys.argv参数传递,但是如果我部署一个模板,它会将dynamic_value作为PipelineOption。如何动态创建查询?令人惊讶的是,beam.io.ReadFromBigQuery方法有一种将dataset、project_id和表作为参数的机制,但没有任何参数,我们可以在其中为数据指定过滤器。查询整个表是不必要的,并且开销很大。有没有人能提供同样的解决方案。
发布于 2021-02-17 02:02:06
您应该考虑使用Flex Templates来支持作为模板的查询的完全动态配置。
https://stackoverflow.com/questions/66201380
复制相似问题