我们尝试为整个转换创建一个测试函数。
import os
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.api import Pipeline
from .myproject.datasets import my_transform
# This assumes your test data exists in the folder /test/fixtures/data/ within the repo next to this test
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures', 'data')
def test_my_transform(spark_session):
pipeline = Pipeline()
pipeline.add_transforms(my_transform)
runner = TransformRunner(pipeline, '/my_fabulous_project', TEST_DATA_DIR)
output = runner.build_dataset(spark_session, '/my_fabulous_project/output/test')
assert output.first()['col_c'] == 3基于文档和this post,我们尝试修改函数的导入,但是我们总是得到以下错误之一:
transforms._errors.TransformTypeError:类型为 ModuleNotFoundError:没有名为'test.myproject‘>ValueError的模块:在顶层包之外尝试相对导入的参数
如何为转换创建一个工作的端到端测试功能?
发布于 2022-08-29 17:30:25
在尝试了几种不同条件的方法之后,下面的方法在我看来是最干净的。
没有硬编码路径的数据集( inputs
)。
test_my_transform.py
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.verbs.testing.datastores import InMemoryDatastore
from myproject.datasets.my_transform import compute_sum
def test_compute_sum(spark_session):
df_input1 = spark_session.createDataFrame([
(0, 2)
], ['col_a', 'col_b'])
df_input2 = spark_session.createDataFrame([
(0, 1)
], ['col_a', 'col_b'])
df_expected = spark_session.createDataFrame([
(0, 1, 1),
(0, 2, 2)
], ['col_a', 'col_b', 'col_c'])
# If @transform_df or @transform_pandas, the key is 'bound_output'
# If @transform, the key is the name of variable Output
output_map = {'out': df_expected}
input_map = {
'input_a': df_input1,
'input_b': df_input2,
}
pipeline = Pipeline()
pipeline.add_transforms(compute_sum)
store = InMemoryDatastore()
for inp_name, inp_obj in pipeline.transforms[0].inputs.items():
store.store_dataframe(inp_obj.alias, input_map[inp_name])
path_out = pipeline.transforms[0].outputs[list(output_map)[0]].alias
runner = TransformRunner(pipeline, datastore=store)
df_out = runner.build_dataset(spark_session, path_out)
assert df_out.subtract(df_expected).count() == 0
assert df_expected.subtract(df_out).count() == 0
assert df_out.schema == df_expected.schemamy_transform.py
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
@transform(
out=Output('/some_foundry_path/my_dir/out3'),
input_a=Input('/some_foundry_path/my_dir/in'),
input_b=Input('/some_foundry_path/my_dir/in2'))
def compute_sum(input_a, input_b, out):
input_a = input_a.dataframe()
input_b = input_b.dataframe()
df = input_a.unionByName(input_b)
df = df.withColumn('col_c', F.col('col_a') + F.col('col_b'))
out.write_dataframe(df)发布于 2022-08-03 09:01:46
下面的转换测试适用于使用@transform和@transform_df修饰的函数。
my_transform.py位于src/myproject/datasets文件夹中的存储库中。
from transforms.api import Input, Output, transform_df
from pyspark.sql import functions as F
@transform_df(
Output('/some_foundry_path/my_dir/out'),
input_a=Input('/some_foundry_path/my_dir/in'))
def compute_sum(input_a):
df = input_a.withColumn('col_c', F.col('col_a') + F.col('col_b'))
return df输入文件:

将测试输入存储在内存中的方法()
test_my_transform.py位于src/test文件夹中的存储库中。
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.verbs.testing.datastores import InMemoryDatastore
from myproject.datasets.my_transform import compute_sum
def test_compute_sum(spark_session):
df_in = spark_session.createDataFrame([
(0, 1)
], ['col_a', 'col_b'])
df_expected = spark_session.createDataFrame([
(0, 1, 1)
], ['col_a', 'col_b', 'col_c'])
path_in = '/some_foundry_path/my_dir/in'
path_out = '/some_foundry_path/my_dir/out'
pipeline = Pipeline()
pipeline.add_transforms(compute_sum)
store = InMemoryDatastore()
store.store_dataframe(path_in, df_in)
runner = TransformRunner(pipeline, datastore=store)
df_out = runner.build_dataset(spark_session, path_out)
assert df_out.subtract(df_expected).count() == 0
assert df_expected.subtract(df_out).count() == 0
assert df_out.schema == df_expected.schemapath_in和path_out与转换的输入和输出路径完全相同。所以很容易遵循这个脚本。
方法,其中测试输入存储在存储库中的.csv中
这一办法在正式文件中有。更详细的是,不太容易理解应该创建哪些路径,而且可能很难维护:如果数据集路径发生变化,可能需要创建一个新的存储库树。
test_my_transform.py位于src/test文件夹中的存储库中。
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
import os
from myproject.datasets.my_transform import compute_sum
# Taking this .py file's dir and appending the path to the test data
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures/data/input')
def test_compute_sum(spark_session):
path_in_prefix = '/some_foundry_path/my_dir'
path_out = '/some_foundry_path/my_dir/out'
pipeline = Pipeline()
pipeline.add_transforms(compute_sum)
runner = TransformRunner(pipeline, path_in_prefix, TEST_DATA_DIR)
df_out = runner.build_dataset(spark_session, path_out)
assert df_out.head()['col_c'] == 1在存储库中创建了测试CSV文件(in.csv -它与转换输入具有相同的名称in ):

col_a,col_b
0,1注意:
用于所有输入的
输入路径(/some_foundry_path/my_dir/in)
较少
path_in_prefix (/some_foundry_path/my_dir/) 应等于
CSV测试文件全路径(...src/test/fixtures/data/input/in)
较少
TEST_DATA_DIR (...src/test/fixtures/data/input)
使测试与检查一起自动运行,在transforms-python/build.gradle中取消注释下面的行

https://stackoverflow.com/questions/73218988
复制相似问题