首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何测试Palantir铸造公司的改造?

如何测试Palantir铸造公司的改造?
EN

Stack Overflow用户
提问于 2022-08-03 09:01:46
回答 2查看 394关注 0票数 1

我们尝试为整个转换创建一个测试函数。

代码语言:javascript
复制
import os
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.api import Pipeline
from .myproject.datasets import my_transform

# This assumes your test data exists in the folder /test/fixtures/data/ within the repo next to this test
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures', 'data')

def test_my_transform(spark_session):
    pipeline = Pipeline()
    pipeline.add_transforms(my_transform)

    runner = TransformRunner(pipeline, '/my_fabulous_project', TEST_DATA_DIR)

    output = runner.build_dataset(spark_session, '/my_fabulous_project/output/test')
    assert output.first()['col_c'] == 3

基于文档和this post,我们尝试修改函数的导入,但是我们总是得到以下错误之一:

transforms._errors.TransformTypeError:类型为 ModuleNotFoundError:没有名为'test.myproject‘>ValueError的模块:在顶层包之外尝试相对导入的参数

如何为转换创建一个工作的端到端测试功能?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-08-29 17:30:25

在尝试了几种不同条件的方法之后,下面的方法在我看来是最干净的。

没有硬编码路径的数据集( inputs

  • in-memory

  • )非常明确地说明如何添加/删除转换,

  • 数据作为测试输入(

)。

test_my_transform.py

代码语言:javascript
复制
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.verbs.testing.datastores import InMemoryDatastore
from myproject.datasets.my_transform import compute_sum


def test_compute_sum(spark_session):

    df_input1 = spark_session.createDataFrame([
        (0, 2)
    ], ['col_a', 'col_b'])

    df_input2 = spark_session.createDataFrame([
        (0, 1)
    ], ['col_a', 'col_b'])

    df_expected = spark_session.createDataFrame([
        (0, 1, 1),
        (0, 2, 2)
    ], ['col_a', 'col_b', 'col_c'])

    # If @transform_df or @transform_pandas, the key is 'bound_output'
    # If @transform, the key is the name of variable Output
    output_map = {'out': df_expected}
    input_map = {
        'input_a': df_input1,
        'input_b': df_input2,
    }

    pipeline = Pipeline()
    pipeline.add_transforms(compute_sum)
    store = InMemoryDatastore()
    for inp_name, inp_obj in pipeline.transforms[0].inputs.items():
        store.store_dataframe(inp_obj.alias, input_map[inp_name])
    path_out = pipeline.transforms[0].outputs[list(output_map)[0]].alias
    runner = TransformRunner(pipeline, datastore=store)
    df_out = runner.build_dataset(spark_session, path_out)

    assert df_out.subtract(df_expected).count() == 0
    assert df_expected.subtract(df_out).count() == 0
    assert df_out.schema == df_expected.schema

my_transform.py

代码语言:javascript
复制
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F


@transform(
    out=Output('/some_foundry_path/my_dir/out3'),
    input_a=Input('/some_foundry_path/my_dir/in'),
    input_b=Input('/some_foundry_path/my_dir/in2'))
def compute_sum(input_a, input_b, out):
    input_a = input_a.dataframe()
    input_b = input_b.dataframe()
    df = input_a.unionByName(input_b)
    df = df.withColumn('col_c', F.col('col_a') + F.col('col_b'))
    out.write_dataframe(df)
票数 1
EN

Stack Overflow用户

发布于 2022-08-03 09:01:46

下面的转换测试适用于使用@transform@transform_df修饰的函数。

my_transform.py位于src/myproject/datasets文件夹中的存储库中。

代码语言:javascript
复制
from transforms.api import Input, Output, transform_df
from pyspark.sql import functions as F


@transform_df(
    Output('/some_foundry_path/my_dir/out'),
    input_a=Input('/some_foundry_path/my_dir/in'))
def compute_sum(input_a):
    df = input_a.withColumn('col_c', F.col('col_a') + F.col('col_b'))
    return df

输入文件:

将测试输入存储在内存中的方法()

test_my_transform.py位于src/test文件夹中的存储库中。

代码语言:javascript
复制
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.verbs.testing.datastores import InMemoryDatastore
from myproject.datasets.my_transform import compute_sum


def test_compute_sum(spark_session):

    df_in = spark_session.createDataFrame([
        (0, 1)
    ], ['col_a', 'col_b'])

    df_expected = spark_session.createDataFrame([
        (0, 1, 1)
    ], ['col_a', 'col_b', 'col_c'])

    path_in = '/some_foundry_path/my_dir/in'
    path_out = '/some_foundry_path/my_dir/out'

    pipeline = Pipeline()
    pipeline.add_transforms(compute_sum)
    store = InMemoryDatastore()
    store.store_dataframe(path_in, df_in)
    runner = TransformRunner(pipeline, datastore=store)
    df_out = runner.build_dataset(spark_session, path_out)

    assert df_out.subtract(df_expected).count() == 0
    assert df_expected.subtract(df_out).count() == 0
    assert df_out.schema == df_expected.schema

path_inpath_out与转换的输入和输出路径完全相同。所以很容易遵循这个脚本。

方法,其中测试输入存储在存储库中的.csv中

这一办法在正式文件中有。更详细的是,不太容易理解应该创建哪些路径,而且可能很难维护:如果数据集路径发生变化,可能需要创建一个新的存储库树。

test_my_transform.py位于src/test文件夹中的存储库中。

代码语言:javascript
复制
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
import os
from myproject.datasets.my_transform import compute_sum

# Taking this .py file's dir and appending the path to the test data
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures/data/input')


def test_compute_sum(spark_session):

    path_in_prefix = '/some_foundry_path/my_dir'
    path_out = '/some_foundry_path/my_dir/out'

    pipeline = Pipeline()
    pipeline.add_transforms(compute_sum)
    runner = TransformRunner(pipeline, path_in_prefix, TEST_DATA_DIR)
    df_out = runner.build_dataset(spark_session, path_out)

    assert df_out.head()['col_c'] == 1

在存储库中创建了测试CSV文件(in.csv -它与转换输入具有相同的名称in ):

代码语言:javascript
复制
col_a,col_b
0,1

注意:

用于所有输入的

输入路径(/some_foundry_path/my_dir/in)

较少

path_in_prefix (/some_foundry_path/my_dir/) 应等于

CSV测试文件全路径(...src/test/fixtures/data/input/in)

较少

TEST_DATA_DIR (...src/test/fixtures/data/input)

使测试与检查一起自动运行,在transforms-python/build.gradle中取消注释下面的行

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73218988

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档