发布于 2021-12-03 20:55:09
针对这个问题,我们将结合几种不同的技术,使这段代码既可测试又具有高度可伸缩性。
理论
在解析原始文件时,您可以考虑以下几个选项:
在我们的例子中,我们可以使用Databricks解析器取得很大的效果。
通常,您还应该避免使用.udf方法,因为它可能被使用,而不是Spark中已有的良好功能。UDF不像本机方法那样具有性能,只有在没有其他选项可用时才应该使用。
UDF掩盖隐藏问题的一个很好的例子是对列内容的字符串操作;虽然从技术上讲,您可以使用UDF来执行诸如拆分和修整字符串之类的事情,但是这些事情已经存在于火花API中,并且比您自己的代码快几个数量级。
设计
我们的设计将使用以下方法:
给分析器上电线
首先,我们需要将.jar添加到转换内部可用的spark_session中。由于最近的改进,这个参数在配置时将允许您在预览/测试和完整构建时使用.jar。以前,这需要一个完整的构建,但现在不是这样。
我们需要转到我们的transforms-python/build.gradle文件并添加两个配置块:
pytest插件condaJars参数并声明.jar依赖项我的/transforms-python/build.gradle现在看起来如下所示:
buildscript {
repositories {
// some other things
}
dependencies {
classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
}
}
apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'
dependencies {
condaJars "com.databricks:spark-xml_2.13:0.14.0"
}
// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
// ... some other awesome features you should enable应用此配置后,您将希望通过单击底部带状区域并单击Refresh重新启动代码辅助会话

在刷新代码帮助之后,我们现在有了可以解析我们的.xml文件的低级功能,现在我们需要测试它!
测试分析器
如果我们采用与这里相同的测试驱动开发风格,那么我们最终会得到具有以下内容的/transforms-python/src/myproject/datasets/xml_parse_transform.py:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)..。一个包含内容的示例文件/transforms-python/test/myproject/datasets/sample.xml:
<tag>
<field1>
my_value
</field1>
</tag>和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py
from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename
def test_parse_xml(spark_session):
file_path = resource_filename(__name__, "sample.xml")
parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"field1"}我们现在有:
.xml解析器,具有高度的可伸缩性。干杯
https://stackoverflow.com/questions/70220574
复制相似问题