一直在寻找这个问题,但所有的答案都是旧的,与我的问题无关。我试图将sql参数作为Jinja模板传递,但无法传递。我对气流有点陌生,这是代码,我不确定Jinja模板是否在PostgresOperator中工作。如果有任何其他方法可以传递,请告诉我,再次我不是试图在SQL查询或文件中传递jinja模板,而是使用它无法呈现的Jinja模板传递SQL文件的名称。
PostgerOperatorCode:
write_to_postgres = PostgresOperator(
task_id ="write_to_postgres",
postgres_conn_id="mypostgres",
#TODO investigate, why this is not working
sql ="queries{{ execution_date.hour }}{{ execution_date.day }}.sql"
) 模板在BashOperator和PythonOpertor上工作,但在Postgres中不起作用。UI中的气流not服务器表示,无法呈现模板。

BashOperatorCode:
extract_gz = BashOperator(
task_id="extract_downloaded_gz",
bash_command="gunzip --force /opt/airflow/dags/wikipageviews{{ execution_date.hour }}{{ execution_date.day }}.gz"
)对于相同代码的BashOperator,它可以工作:

更新:我注意到一件奇怪的事情,当我不把.sql放在sql参数中时,模板就会被正确地呈现,这看起来很奇怪。下面是在没有.sql扩展的sql参数下呈现的带有Jinja模板的屏幕截图,但是它在bash操作符中工作,正如您在上面的代码中看到的那样,它最后使用了.sql。
write_to_postgres = PostgresOperator(
task_id ="write_to_postgres",
postgres_conn_id="mypostgres",
#TODO investigate, why this is not working
sql="queries{{ execution_date.hour }}{{ execution_date.day }}"
) 渲染输出

发布于 2022-10-16 20:25:33
这是我们可以做的一种潜在的方法,对我来说,它是有效的。
def _write_to_postgres(execution_date):
hour = execution_date.hour
day = execution_date.day
filename = f"queries{hour}{day}.sql"
import psycopg2
try:
conn = psycopg2.connect(database="airflow",user='airflow', password='airflow', host='postgres', port= '5432')
except:
raise AirflowFailException
else:
cursor = conn.cursor()
with open(f"/opt/airflow/dags/{filename}","r") as f:
for statement in f.readlines():
print(statement)
cursor.execute(statement)
conn.commit()
cursor.close()
``https://stackoverflow.com/questions/74082517
复制相似问题