我使用BigQuery SQL创建了一个数据管道。首先从云存储中导入CSV文件,然后进行不同的分析,包括利用BigQueryML地理函数进行预测建模和利用解析函数进行KPI计算。
我能够成功地手动运行不同的查询,现在我想要自动化数据管道。
我的第一个选择是DataFlow SQL,但事实证明Dataflow SQL查询语法不支持地理函数。
DataFlow python不是一种选择,因为完整的分析是用SQL完成的,我希望保持这种方式。
我的问题是,有哪些其他GCP选项可用于自动化数据管道。
发布于 2020-04-22 16:55:55
正如我在评论中提到的,如果需要编排查询,可以使用Cloud Composer,这是一个完全托管的Airflow集群。
我创建了下面的代码,或多或少地向您展示了如何使用此工具编排查询。请注意,这是一个基本的代码,可以改进的编码标准。代码基本上编排了3个查询:
bigquery-public-data.catalonian\_mobile\_coverage.mobile\_data\_2015\_2017","..orchestration_1",task_id = 'xxxxxxxx',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False ) run_second_query = bigquery_operator.BigQueryOperator( sql = "SELECT * FROM <your\_project>.orchestration\_1 ORDER BY date date 10000 ","..orchestration_2",task_id = 'yyyyyyyy',write_disposition = "WRITE_TRUNCATE",#create_disposition =“#create_disposition=”,allow_large_results = True,use_legacy_sql = False ) run_third_query = bigquery_operator.BigQueryOperator( sql = "SELECT r_lat(Lat) r_lat,r_lat(Long) r_long,count(1) <your\_project>.orchestration\_2 GROUP BY r_lat,r_long",destination_dataset_table = task_id = 'zzzzzzzz',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False )#定义DAG依赖项。run_first_query >> run_second_query >> run_third_query一步一步前进:
default_dag_args的dict,在创建DAG时将进一步使用它。
default_dag_args ={ 'start_date':datetime.datetime(2020,4,22,15,40),'email_on_failure':False,'email_on_retry':False,‘retry’:1,'retry_delay':datetime.timedelta(minutes=1),'project_id':"",}default_dag_args dict作为默认参数传递,并添加将定义何时运行DAG的schedule interval参数。您可以将此参数与一些预置表达式一起使用,也可以使用CRON表达式,因为您可以看到这里。
使用models.DAG(‘composer_airflow_bigquery_orche体外’,schedule_interval = "*/5 ** *",default_args=default_dag_args)作为dag:bigquery-public-data.catalonian\_mobile\_coverage.mobile\_data\_2015\_2017",destination_dataset_table = 'xxxxxxxx',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,( "..orchestration_2",= 'yyyyyyyy',destination_dataset_table =‘yyyyyyyy’,write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,( use_legacy_sql = False ) run_third_query = bigquery_operator.BigQueryOperator( sql = "SELECT round(lat) r_lat,round(long) r_long,count(1) count FROM<your\_project>.orchestration\_2 GROUP BY r_lat,r_long",destination_dataset_table =bigquery_operator.BigQueryOperator task_id = 'zzzzzzzz',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False )最后,我想添加这个文章,讨论如何在使用CRON表达式时正确设置start_date和schedule_interval。
发布于 2020-04-21 21:31:55
BigQuery有一个内置的调度机制,目前处于beta特性中。
若要自动生成本机SQL管道,可以使用此实用程序。使用CLI:
$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'https://stackoverflow.com/questions/61345381
复制相似问题