首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Palantir Foundry中解析xml文档?

如何在Palantir Foundry中解析xml文档?
EN

Stack Overflow用户
提问于 2021-12-03 20:55:09
回答 1查看 776关注 0票数 6

我有一组要解析的.xml文档。

我以前曾尝试使用获取文件内容并将其转储到单个单元格的方法来解析它们,但是我注意到这在实践中行不通,因为我看到运行时间越来越慢,通常需要几十个小时才能运行:

我的第一个转换接受.xml内容并将其放入单个单元格中,第二个转换接受这个字符串并使用Python的xml库将字符串解析为文档。然后,我可以从DataFrame中提取属性并返回该文档。

我使用一个UDF来执行将字符串内容映射到我想要的字段的过程。

如何使这更快/更好地处理大型.xml文件?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-03 20:55:09

针对这个问题,我们将结合几种不同的技术,使这段代码既可测试又具有高度可伸缩性。

理论

在解析原始文件时,您可以考虑以下几个选项:

  1. ❌,您可以编写自己的解析器,从文件中读取字节,并将它们转换为可以理解的数据火花。
    • 在可能的情况下,由于工程时间和不可伸缩的体系结构,这是非常不鼓励的。当您这样做时,它不会利用分布式计算,因为您必须将整个原始文件带到解析方法中,然后才能使用它。这不是对你资源的有效利用。

  2. 您可以使用您自己的解析器库,而不是为创建的,例如问题⚠中提到的XML库。
    • 虽然这比编写自己的解析器要困难得多,但它仍然没有利用Spark中的分布式计算。它更容易运行,但它最终将达到性能的限制,因为它没有利用只有在编写星火库时才公开的低级别星火功能。

  3. 可以使用星火本机原始文件解析器
    • 在所有情况下,这都是首选选项,因为它利用了低级别的火花功能,并且不需要您编写自己的代码。如果存在低级别的火花解析器,则应该使用它.

在我们的例子中,我们可以使用Databricks解析器取得很大的效果。

通常,您还应该避免使用.udf方法,因为它可能被使用,而不是Spark中已有的良好功能。UDF不像本机方法那样具有性能,只有在没有其他选项可用时才应该使用。

UDF掩盖隐藏问题的一个很好的例子是对列内容的字符串操作;虽然从技术上讲,您可以使用UDF来执行诸如拆分和修整字符串之类的事情,但是这些事情已经存在于火花API中,并且比您自己的代码快几个数量级。

设计

我们的设计将使用以下方法:

  1. 通过Databricks XML解析器完成的低级别火花优化文件解析
  2. 测试驱动的原始文件解析(解释为这里 )

给分析器上电线

首先,我们需要将.jar添加到转换内部可用的spark_session中。由于最近的改进,这个参数在配置时将允许您在预览/测试和完整构建时使用.jar。以前,这需要一个完整的构建,但现在不是这样。

我们需要转到我们的transforms-python/build.gradle文件并添加两个配置块:

  1. 启用pytest插件
  2. 启用condaJars参数并声明.jar依赖项

我的/transforms-python/build.gradle现在看起来如下所示:

代码语言:javascript
复制
buildscript {
    repositories {
       // some other things
    }

    dependencies {
        classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
    }
}

apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'

dependencies {
    condaJars "com.databricks:spark-xml_2.13:0.14.0"
}

// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'

// ... some other awesome features you should enable

应用此配置后,您将希望通过单击底部带状区域并单击Refresh重新启动代码辅助会话

在刷新代码帮助之后,我们现在有了可以解析我们的.xml文件的低级功能,现在我们需要测试它!

测试分析器

如果我们采用与这里相同的测试驱动开发风格,那么我们最终会得到具有以下内容的/transforms-python/src/myproject/datasets/xml_parse_transform.py

代码语言:javascript
复制
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    the_output=Output("my.awesome.output"),
    the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

..。一个包含内容的示例文件/transforms-python/test/myproject/datasets/sample.xml

代码语言:javascript
复制
<tag>
<field1>
my_value
</field1>
</tag>

和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py

代码语言:javascript
复制
from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename


def test_parse_xml(spark_session):
    file_path = resource_filename(__name__, "sample.xml")
    parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"field1"}

我们现在有:

  1. 一种分布式计算的低级别.xml解析器,具有高度的可伸缩性。
  2. 一个测试驱动的设置,我们可以快速迭代以获得正确的功能。

干杯

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70220574

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档