我们正在转到气流2.0,我注意到下面的错误,似乎SnowflakeHook无法读取位于'sql‘目录中的查询,这在气流1.x中运行得很好:
snowflake.connector.errors.ProgrammingError: 001003 (42000):019c5ac7-0602-31b5-0000-01b526e4fa46: SQL编译错误:语法错误行1,位置0意外的'sql‘。 在处理上述异常期间,发生了另一个异常: common.snowflake.exceptions.SQLCompilationSnowflakeException: 001003 (42000):019c5ac7-0602-31b5-0000-01b526e4fa46: SQL编译错误:位于0位的语法错误行1意外的'sql‘。处理查询时发生错误(019c5ac7-0602-31b5-0000-01b526e4fa46):sql/my_query.sql
下面是我们创建的类:
class SnowQueryOperator(BaseOperator):
template_fields = ['sql']
@apply_defaults
def __init__(self,
sql,
params=None,
warehouse=Variable.get('default_snowflake_warehouse'),
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.sql = sql
self.params = params
self.warehouse = warehouse
def execute(self, context):
sf_hook = SnowflakeHook(warehouse=self.warehouse)
sf_hook.execute_query(self.sql)我们就是这样使用它的:
t4 = SnowQueryOperator(
task_id='running_snowflake_query',
sql='sql/my_query.sql',
retries=0,
pool='airflow')发布于 2021-05-19 20:06:48
我不认为这段代码适用于气流1.10,您缺少的template_ext将允许您从.sql文件中读取。
class SnowQueryOperator(BaseOperator):
template_fields = ('sql')
template_ext = ('.sql',)我不清楚为什么要自己实现这个操作符。气流有雪花供应商,有SnowflakeOperator。
您可以用pip install apache-airflow-providers-snowflake安装它,然后将操作符导入为from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
https://stackoverflow.com/questions/67608655
复制相似问题