我正在做Airflow 1.10。
我在KubernetesPodOperator上运行命令时遇到了问题,在DAG运行时会对整个命令进行评估。
我在DAG运行时生成命令,因为命令的一些参数取决于用户传递的参数。
正如我从文档中读到的,KubernetesPodOperator需要字符串列表或jinja模板列表:
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.我有PythonOperator,它生成命令并将其推送到XCOM和KubernetesPodOperator,在这两个命令的参数中,我传递由PythonOperator生成的命令。
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
def command_maker():
import random # random is to illustrate that we don't know arguments value before runtime
return f"my_command {random.randint(1, 10)} --option {random.randint(1, 4)}"
def create_tasks(dag):
first = PythonOperator(
task_id="generate_command",
python_callable=command_maker,
provide_context=True,
dag=dag,
)
second = KubernetesPodOperator(
namespace='some_namespace',
image='some_image',
name='execute_command',
dag=dag,
arguments=[f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="return_value")}}']
)
second.set_upstream(first)不幸的是,KubernetesPodOperator不能正确运行这个命令,因为他尝试运行类似下面这样的命令:
[my_command 4 --option 2]有没有办法在KubernetesPodOperator运行时评估这个列表,或者强制我将所有运行时参数推到单独的XCOM中?我想避免这样的解决方案,因为它需要我的项目中的许多变化。
arguments=[
"my_command",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="first_argument")}}',
"--option",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="second_argument")}}',
]发布于 2021-08-10 19:46:35
为了得到Airflow 1.10的有效解决方案,我不得不使用BaseOperator.pre_execute钩子。
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.lineage import prepare_lineage
class UnpackCommandKubernetesPodOperator(KubernetesPodOperator):
@prepare_lineage
def pre_execute(self, context):
self.arguments = self.arguments[0].split(" ")发布于 2021-08-06 11:48:12
问题是JINJA模板默认以字符串的形式返回模板。
然而,在最近的Airflow中(从Airlfow 2.1.0开始),您可以将模板呈现为原生python对象:
通过在创建DAG时使用render_template_as_native_obj=True参数。
然后,您需要以python的literal_eval能够将其转换为python对象的方式来格式化输出。在您的示例中,您必须使输出类似于:
[ 'my_command', '4', '--option', '2' ]
请注意,此参数将返回所有模板的本机对象,因此,如果它们返回一些literal_eval understands的值,它们也将转换为本机类型(并且您可能会有一些意想不到的副作用。
https://stackoverflow.com/questions/68679268
复制相似问题