我试图使用pytest在本地测试kfp.v2.ds1 (它在管道上工作)中的kubeflow组件,但是与输入/输出参数以及固定设备一起挣扎。
下面是一个代码示例来说明这个问题:
首先,我创建了一个用于模拟dataset的夹具。这个夹具也是一个库贝流组件。
# ./fixtures/
@pytest.fixture
@component()
def sample_df(dataset: Output[Dataset]):
df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [15, 19, 22],
}
)
dataset.path += '.csv'
df.to_csv(dataset.path, index=False)
return让我们假设组件的年龄是原来的两倍。
# ./src/
@component()
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
df = pd.read_csv(df_input.path)
double_df = df.copy()
double_df['age'] = double_df['age']*2
df_output.path += '.csv'
double_df.to_csv(df_output.path, index=False)然后,测试:
#./tests/
@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):
expected_df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [30, 38, 44],
}
)
df_component = double_ages(sample_df) # This is where I call the component, sample_df is an Input[Dataset]
df_output = df_component.outputs['df_output']
df = pd.read_csv(df_output.path)
assert df['age'].tolist() == expected_df['age'].tolist()但问题就发生在那里。应该作为输出传递的OutputDataset不是,因此组件无法正确地使用它,那么我将在assert df['age'].tolist() == expected_df['age'].tolist()上得到以下错误
AttributeError:'TaskOutputArgument‘对象没有属性'path’
显然,对象的类型是TaskOutputArgument,而不是Dataset。
有人知道怎么解决这个问题吗?或者如何在kfp组件中正确地使用pytest?我在网上搜索了很多,但没有找到任何线索。
发布于 2022-10-19 03:56:00
我花了一段时间研究这个问题,我的结论是,单独的组件不应该由kfp的设计进行单元测试。这意味着您必须依赖于单元测试每个组件的逻辑,将每个逻辑块封装在组件中,然后测试kfp管道的端到端功能。
我同意,如果有一种方法可以轻松地模拟输入和输出,那将是相当酷的,但我挖掘得相当深入,而且在这个时候,这似乎不是一个预期的用途(或一个简单的黑客)。
发布于 2022-10-25 07:32:53
在花了我一个下午的时间之后,我终于想出了一种方法来对基于python的KFP组件进行python测试。由于我在这个问题上找不到其他线索,我希望这能有助于:
访问要测试的函数
诀窍不是直接测试由@component装饰器创建的KFP组件。但是,您可以通过组件属性python_func访问内部修饰的Python函数。
模拟人工制品
关于Input和Output构件,当您绕过KFP访问和调用测试函数时,您必须手动创建它们并将它们传递给函数:
input_artifact = Dataset(uri='input_df_previously_saved.csv')
output_artifact = Dataset(uri='target_output_path.csv')我必须为Artifact.path属性的工作方式找到一个解决方案(它也适用于所有KFP Artifact子类:Dataset、Model、.)。如果您查看KFP源代码,您会发现它使用的是_get_path()方法,如果uri属性不以定义的云前缀之一"gs://"、"s3://"或"minio://"开头,则返回uri。当我们使用本地路径手动构建工件时,希望读取工件的path属性的测试组件将读取None值。
因此,我创建了一个简单的方法,用于构建Artifact (或Dataset或任何其他Artifact子类)的子类。构建的子类只需更改为返回uri值,而不是在非云uri的特定情况下返回None。
你的例子
将所有这些放在一起进行测试和安装,我们可以得到以下代码:
src/double_ages_component.py:要测试的组件这里没什么变化。我刚刚添加了pandas导入:
from kfp.v2.dsl import component, Input, Dataset, Output
@component
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
import pandas as pd
df = pd.read_csv(df_input.path)
double_df = df.copy()
double_df['age'] = double_df['age'] * 2
df_output.path += '.csv'
double_df.to_csv(df_output.path, index=False)tests/utils.py:工件子类构建器import typing
def make_test_artifact(artifact_type: typing.Type):
class TestArtifact(artifact_type):
def _get_path(self):
return super()._get_path() or self.uri
return TestArtifact我仍然不确定这是最恰当的解决办法。您还可以手动为您使用的每个工件创建一个子类(示例中为Dataset)。或者您可以使用kfp.v2.dsl.Artifact直接模拟吡啶-模拟类。
tests/conftest.py:你的夹具我将示例dataframe创建者组件从夹具中分离出来。因此,我们有一个标准的KFP组件定义+一个构建其输出工件并调用其python函数的夹具:
from kfp.v2.dsl import component, Dataset, Output
import pytest
from tests.utils import make_test_artifact
@component
def sample_df_component(dataset: Output[Dataset]):
import pandas as pd
df = pd.DataFrame({
'name': ['Ana', 'Maria', 'Josh'],
'age': [15, 19, 22],
})
dataset.path += '.csv'
df.to_csv(dataset.path, index=False)
@pytest.fixture
def sample_df():
# define output artifact
output_path = 'local_sample_df.csv' # any writable local path. I'd recommend to use pytest `tmp_path` fixture.
sample_df_artifact = make_test_artifact(Dataset)(uri=output_path)
# call component python_func by passing the artifact yourself
sample_df_component.python_func(dataset=sample_df_artifact)
# the artifact object is now altered with the new path that you define in sample_df_component (".csv" extension added)
return sample_df_artifact该夹具返回一个工件对象,引用示例数据被保存到的选定的本地路径。
tests/test_component.py:您的实际组件测试再一次,我们的想法是构建I/O工件并调用组件的python_func
from kfp.v2.dsl import Dataset
import pandas as pd
from src.double_ages_component import double_ages
from tests.utils import make_test_artifact
def test_double_ages(sample_df):
expected_df = pd.DataFrame({
'name': ['Ana', 'Maria', 'Josh'],
'age': [30, 38, 44],
})
# input artifact is passed in parameter via sample_df fixture
# create output artifact
output_path = 'local_test_output_df.csv'
output_df_artifact = make_test_artifact(Dataset)(uri=output_path)
# call component python_func
double_ages.python_func(df_input=sample_df, df_output=output_df_artifact)
# read output data
df = pd.read_csv(output_df_artifact.path)
# write your tests
assert df['age'].tolist() == expected_df['age'].tolist()结果
> pytest
================ test session starts ================
platform linux -- Python 3.8.13, pytest-7.1.3, pluggy-1.0.0
rootdir: /home/USER/code/kfp_tests
collected 1 item
tests/test_component.py . [100%]
================ 1 passed in 0.28s ================https://stackoverflow.com/questions/73953744
复制相似问题