首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KubernetesPodOperator -运行从XCOM拉出的整个命令

KubernetesPodOperator -运行从XCOM拉出的整个命令
EN

Stack Overflow用户
提问于 2021-08-06 09:26:17
回答 2查看 276关注 0票数 0

我正在做Airflow 1.10。

我在KubernetesPodOperator上运行命令时遇到了问题,在DAG运行时会对整个命令进行评估。

我在DAG运行时生成命令,因为命令的一些参数取决于用户传递的参数。

正如我从文档中读到的,KubernetesPodOperator需要字符串列表或jinja模板列表:

代码语言:javascript
复制
    :param arguments: arguments of the entrypoint. (templated)
        The docker image's CMD is used if this is not provided.

我有PythonOperator,它生成命令并将其推送到XCOM和KubernetesPodOperator,在这两个命令的参数中,我传递由PythonOperator生成的命令。

代码语言:javascript
复制
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator


def command_maker():
    import random # random is to illustrate that we don't know arguments value before runtime
    return f"my_command {random.randint(1, 10)} --option {random.randint(1, 4)}"

def create_tasks(dag):
    first = PythonOperator(
        task_id="generate_command",
        python_callable=command_maker,
        provide_context=True,
        dag=dag,
    )
    second = KubernetesPodOperator(
        namespace='some_namespace',
        image='some_image',
        name='execute_command',
        dag=dag,
        arguments=[f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="return_value")}}']
    )
    second.set_upstream(first)

不幸的是,KubernetesPodOperator不能正确运行这个命令,因为他尝试运行类似下面这样的命令:

代码语言:javascript
复制
[my_command 4 --option 2]

有没有办法在KubernetesPodOperator运行时评估这个列表,或者强制我将所有运行时参数推到单独的XCOM中?我想避免这样的解决方案,因为它需要我的项目中的许多变化。

代码语言:javascript
复制
         arguments=[
            "my_command",
            f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="first_argument")}}',
            "--option",
            f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="second_argument")}}',
         ]
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-08-10 19:46:35

为了得到Airflow 1.10的有效解决方案,我不得不使用BaseOperator.pre_execute钩子。

代码语言:javascript
复制
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.lineage import prepare_lineage

class UnpackCommandKubernetesPodOperator(KubernetesPodOperator):
    @prepare_lineage
    def pre_execute(self, context):
        self.arguments = self.arguments[0].split(" ")
票数 1
EN

Stack Overflow用户

发布于 2021-08-06 11:48:12

问题是JINJA模板默认以字符串的形式返回模板。

然而,在最近的Airflow中(从Airlfow 2.1.0开始),您可以将模板呈现为原生python对象:

https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#rendering-fields-as-native-python-objects

通过在创建DAG时使用render_template_as_native_obj=True参数。

然后,您需要以python的literal_eval能够将其转换为python对象的方式来格式化输出。在您的示例中,您必须使输出类似于:

[ 'my_command', '4', '--option', '2' ]

请注意,此参数将返回所有模板的本机对象,因此,如果它们返回一些literal_eval understands的值,它们也将转换为本机类型(并且您可能会有一些意想不到的副作用。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68679268

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档