我有一个数据集,它是Palantir Foundry代码库中定义的Python转换的输出。它有特定的列,但考虑到随着时间的推移,数据可能会发生变化,我希望验证这些列(大约73)在未来保持。
如何创建数据健康预期或检查,以确保所有73列在未来?
发布于 2022-07-19 12:54:22
您可以使用期望值来断言输出架构中存在哪些列。
有3种模式期望:
# Assert some columns exist.
E.schema().contains({'col1': type1, 'col2': type2})
# Assert the schema contains only columns from the given set (but not necessarily all of them).
E.schema().is_subset_of({'col1': type1, 'col2': type2})
# Assert the schema contains exactly the given columns.
E.schema().equals({'col1': type1, 'col2': type2})此外,对于检查单个列,可以使用E.col('col1').exists()。但是对于73列,最好使用E.schema()。
因此,对于一个更充实的例子,您可能有如下内容:
from transforms.api import transform_df, Check, Input, Output
import transforms.expectations as E
from pyspark.sql import types as T
COLUMNS_WHICH_MUST_EXIST = {
'string_column': T.StringType(),
'number_column': T.IntegerType(),
# ...and 71 more.
}
@transform_df(
Output("ri.foundry.main.dataset.abcdef", checks=[
Check(E.schema().contains(COLUMNS_WHICH_MUST_EXIST), "contains important columns"),
]),
input_data=Input("ri.foundry.main.dataset.12345678"),
)
def compute(input_data):
# ... your logic here有关可用选项的更多详细信息,请参见期望值检查的官方文件。
https://stackoverflow.com/questions/73011476
复制相似问题