试图基于KubernetesPodOperator制作我自己的组件。我能够定义组件并将其添加到组件列表中,但是当尝试运行它时,我得到:
节点'KubernetesPodOperator‘的运算符'KubernetesPodOperator’不在可用运算符列表中配置。请将“KubernetesPodOperator”的完全限定包名添加到KubernetesPodOperator配置中。
和错误:
Traceback (most recent call last):
File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
result = await result
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
response = await PipelineProcessorManager.instance().process(pipeline)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
target_ops = self._cc_pipeline(pipeline, pipeline_name)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
在查看src代码之后,我可以在airflow.py中看到以下几行:
# This specifies the default airflow operators included with Elyra. Any Airflow-based
# custom connectors should create/extend the elyra configuration file to include
# those fully-qualified operator/class names.
available_airflow_operators = ListTrait(
CUnicode(),
["airflow.operators.slack_operator.SlackAPIPostOperator",
"airflow.operators.bash_operator.BashOperator",
"airflow.operators.email_operator.EmailOperator",
"airflow.operators.http_operator.SimpleHttpOperator",
"airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
"airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines. These operators must
be fully qualified (i.e., prefixed with their package names).
""",
).tag(config=True)
因此,我不确定这是否可以从客户扩展。
发布于 2022-02-16 00:16:39
The available_airflow_operators list是Elyra的一种类似的可配置特性。您必须向这个列表中添加完全限定的KubernetesPodOperator包名,这样它才能正确地创建DAG。
为此,使用jupyter elyra --generate-config从命令行生成一个配置文件。打开所创建的文件并添加以下行(如果您希望保持文件的组织性,可以将其添加到“高度PipelineProcessor(LoggingConfigurable)”标题下):
c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")如果不是上述的话,将字符串值更改为正确的用例包(确保它以所需操作符的类名结尾)。如果需要添加多个包,也可以使用extend而不是append。
编辑:这里是指向相关文档的链接
https://stackoverflow.com/questions/71113714
复制相似问题