首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使BigQuery SQL管道自动化

如何使BigQuery SQL管道自动化
EN

Stack Overflow用户
提问于 2020-04-21 13:59:06
回答 2查看 1.2K关注 0票数 2

我使用BigQuery SQL创建了一个数据管道。首先从云存储中导入CSV文件,然后进行不同的分析,包括利用BigQueryML地理函数进行预测建模和利用解析函数进行KPI计算。

我能够成功地手动运行不同的查询,现在我想要自动化数据管道。

我的第一个选择是DataFlow SQL,但事实证明Dataflow SQL查询语法不支持地理函数。

DataFlow python不是一种选择,因为完整的分析是用SQL完成的,我希望保持这种方式。

我的问题是,有哪些其他GCP选项可用于自动化数据管道。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-22 16:55:55

正如我在评论中提到的,如果需要编排查询,可以使用Cloud Composer,这是一个完全托管的Airflow集群。

我创建了下面的代码,或多或少地向您展示了如何使用此工具编排查询。请注意,这是一个基本的代码,可以改进的编码标准。代码基本上编排了3个查询:

  1. 第一个从公共表读取并写入项目中的另一个表。
  2. 第二个查询读取在第一个查询中创建的表,并在date列中选择最新的10000行。之后,它将结果保存到项目中的一个表中。
  3. 第三种方法读取步骤2中创建的表,并计算一些聚合。之后,它将结果保存到项目中的另一个表中。 从airflow.contrib.operators导入bigquery_operator的气流导入模型导入日期时间“”下面介绍的条件将按schedule_interval属性中指定的方式每五分钟运行一次DAG,从start_date属性中指定的日期时间开始“default_dag_args ={ 'start_date':datetime.datetime(2020,4,22,15,40),‘email_on_ five’:False,'email_on_retry':False,‘retry’:1,‘'retry_delay':datetime.timedelta(minutes=1),'project_id':"",} models.DAG(’composer_airflow_bigquery_编排‘,schedule_interval = "*/5 ** *",default_args=default_dag_args)作为dag: run_first_query = bigquery_operator.BigQueryOperator( sql = "SELECT ** bigquery-public-data.catalonian\_mobile\_coverage.mobile\_data\_2015\_2017","..orchestration_1",task_id = 'xxxxxxxx',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False ) run_second_query = bigquery_operator.BigQueryOperator( sql = "SELECT * FROM <your\_project>.orchestration\_1 ORDER BY date date 10000 ","..orchestration_2",task_id = 'yyyyyyyy',write_disposition = "WRITE_TRUNCATE",#create_disposition =“#create_disposition=”,allow_large_results = True,use_legacy_sql = False ) run_third_query = bigquery_operator.BigQueryOperator( sql = "SELECT r_lat(Lat) r_lat,r_lat(Long) r_long,count(1) <your\_project>.orchestration\_2 GROUP BY r_lat,r_long",destination_dataset_table = task_id = 'zzzzzzzz',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False )#定义DAG依赖项。run_first_query >> run_second_query >> run_third_query

一步一步前进:

  • 首先,它引入了一些气流库,如模型和bigquery_operator。 从airflow.contrib.operators导入bigquery_operator的气流导入模型
  • 然后,它定义了一个名为default_dag_args的dict,在创建DAG时将进一步使用它。 default_dag_args ={ 'start_date':datetime.datetime(2020,4,22,15,40),'email_on_failure':False,'email_on_retry':False,‘retry’:1,'retry_delay':datetime.timedelta(minutes=1),'project_id':"",}
  • 创建DAG时,将default_dag_args dict作为默认参数传递,并添加将定义何时运行DAG的schedule interval参数。您可以将此参数与一些预置表达式一起使用,也可以使用CRON表达式,因为您可以看到这里。 使用models.DAG(‘composer_airflow_bigquery_orche体外’,schedule_interval = "*/5 ** *",default_args=default_dag_args)作为dag:
  • 之后,您可以创建操作符的实例。在本例中,我们只使用BigQueryOperator run_first_query = bigquery_operator.BigQueryOperator( sql = "SELECT * FROM bigquery-public-data.catalonian\_mobile\_coverage.mobile\_data\_2015\_2017",destination_dataset_table = 'xxxxxxxx',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,( "..orchestration_2",= 'yyyyyyyy',destination_dataset_table =‘yyyyyyyy’,write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,( use_legacy_sql = False ) run_third_query = bigquery_operator.BigQueryOperator( sql = "SELECT round(lat) r_lat,round(long) r_long,count(1) count FROM<your\_project>.orchestration\_2 GROUP BY r_lat,r_long",destination_dataset_table =bigquery_operator.BigQueryOperator task_id = 'zzzzzzzz',write_disposition = "WRITE_TRUNCATE",#create_disposition = "",allow_large_results = True,use_legacy_sql = False )
  • 作为最后一步,我们可以定义DAG的依赖项。这段代码意味着run_second_query操作依赖于run_first_query的结论,所以它就这样做了。 run_first_query >> run_second_query >> run_third_query

最后,我想添加这个文章,讨论如何在使用CRON表达式时正确设置start_date和schedule_interval。

票数 2
EN

Stack Overflow用户

发布于 2020-04-21 21:31:55

BigQuery有一个内置的调度机制,目前处于beta特性中。

若要自动生成本机SQL管道,可以使用此实用程序。使用CLI:

代码语言:javascript
复制
$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61345381

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档