我试图通过传入sql文件的路径来创建一个SnowflakeOpereator。
SnowflakeOperator(task_id="some_task_id",
sql='sql/test.sql',
**snowflake_connection)但是,运算符失败,因为它尝试将sql/test.sql作为sql语句执行,而不是将SQL文件作为模板化sql读取。
我还尝试创建了一个继承自BaseOperator的自定义运算符,并添加了以下模板字段:
template_fields = ('sql')
template_ext = ('.sql',)它导致了同样的行为。
有人能给点建议吗?
发布于 2020-05-23 23:10:23
使用Airflow 1.10.10,下面是一个使用SnowflakeOperator的示例,其中传入了一个指向SQL文件的路径,以及使用了模板特性:)
DAG文件dummy_dag.py,存储在DAG文件夹中
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
dag = DAG('dummy_dag', schedule_interval='0 * * * *', start_date=datetime(2020, 5, 23))
snowflake_operator = SnowflakeOperator(task_id='snowflake_task', sql='./test.sql', params={'dynamic_table': 'my_dynamic_table'}, dag=dag)
dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
snowflake_operator >> dummy_operatorSQL文件test.sql,存储在dags文件夹中
SELECT * FROM table;
-- templatised table name using Jinja
SELECT * FROM {{ params.dynamic_table }};在Airflow UI中,如果您查看snowflake_task的呈现模板,您应该会看到将要执行的SQL。
SELECT * FROM table;
-- templatised table name using Jinja
SELECT * FROM my_dynamic_table;发布于 2020-06-30 16:02:35
@Lee:不确定这是否有帮助,但我相信template_fields是一个列表类型。您定义它的方式使它成为一个元组。不确定这是否重要,但如果你仍然卡住了,这是值得一试的。
发布于 2020-08-27 01:59:48
如果你有多行-这对我不起作用
不支持在单个API调用中包含多个SQL语句;请改用每个语句一个API调用。回溯(最近一次调用)
https://stackoverflow.com/questions/61948379
复制相似问题