我正在使用pytest对我的气流DAG进行完整性测试,这是我当前的文件夹结构:
|-- dags
| |-- 01_lasic_retraining_overview.py
| |-- 02_lasic_retraining_sagemaker_autopilot.py
| |-- 03_lasic_retraining_h20_automl.py
| |-- __init__.py
| `-- common
| |-- __init__.py
| `-- helper.py
|-- docker-compose.yaml
|-- newrelic.ini
|-- plugins
|-- requirements.txt
|-- sample.env
|-- setup.sh
|-- test.sh
`-- tests
|-- common
| `-- test_helper.py
`-- dags
|-- test_02_lasic_retraining_sagemaker_autopilot.py
|-- test_03_lasic_retraining_h20_automl.py
`-- test_dag_integrity.py在我的所有dags中,除了01_lasic_retraining_overview.py(不是测试),我将帮助函数从dags/common/helper.py导入到它们,这就是测试失败的原因:
import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
> from common.helper import _create_connection, _etl_lasic
E ModuleNotFoundError: No module named 'common'
dags/03_lasic_retraining_h20_automl.py:6: ModuleNotFoundError
=================================== short test summary info ===================================
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/02_lasic_retraining_sagemaker_autopilot.py]
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/03_lasic_retraining_h20_automl.py]现在,这段代码在我的停靠容器中运行时没有问题。我尝试过但没有成功的事情:
folder.
python -m pytest tests/
dags
PYTHONPATH=. pytest
中的__init__.py文件,将__init__py添加到tests中
完整性测试代码在/tests/dags/test_dag_integrity.py上
import re
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# go to the root dir and browse for any files that match the pattern
# this will find all the dag files
DAG_PATH = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"dags/**/0*.py",
)
# holds a list of all the dag files
DAG_FILES = glob.glob(
DAG_PATH,
recursive=True,
)
# filter the files to exclude the 01 dag run as that is just a plan of the
# pipeline
DAG_FILES = [file for file in DAG_FILES if not re.search("/01", file)]
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# Load file
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(
module_name,
module_path,
)
module = importlib.util.module_from_spec(
mod_spec, # type: ignore
)
mod_spec.loader.exec_module(module) # type: ignore
# all objects of class DAG found in file
dag_objects = [
var
for var in vars(module).values()
if isinstance(
var,
DAG,
)
]
# check if DAG objects were found in the file
assert dag_objects
# check if there are no cycles in the dags
for dag in dag_objects:
dag.test_cycle() # type: ignore发布于 2021-08-20 08:51:43
你需要检查一下你的PYTHONPATH是什么。您可能在您的dags中没有PYTHONPATH。您的PYTHONPATH可能指向文件结构的根目录,因此导入它的“公共”文件夹的正确方法是
import dags.common类似于您常用的测试代码
import tests.commonPython (甚至python 3)没有一个非常好的机制来相对于当前加载的文件导入内容。即使有“相对”进口。(在前面)--他们令人困惑,工作方式与你想象的不一样。避免使用它们。只要确保你的。
还要避免将PYTHONPATH设置为".“。它使您的导入工作与当前目录不同。最好的方法是设置一次并导出。
export PYTHONPATH="$(pwd)"上面的内容将PYTHONPATH设置为当前所在的目录,并将其设置为绝对路径。
发布于 2021-08-20 12:57:58
我还在Docker容器中运行应用程序,其中@Jarek Potiuk提供的答案在实际运行DAG时不起作用,因此我使用的是一种超级黑客方式,只包括在docker中工作的导入部件和在本地工作的导入部件。
try:
# Works locally with tests
from common.helper import _create_connection, _etl_lasic
except ImportError:
# Works in docker container
from dags.common.helper import _create_connection, _etl_lasic发布于 2021-08-20 23:35:21
在这里提出一个疯狂的想法,尝试将__init__.py添加到*/dag或*/common以及*/tests中。
https://stackoverflow.com/questions/68856714
复制相似问题