首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何处理代码存储库中的大型文件?

如何处理代码存储库中的大型文件?
EN

Stack Overflow用户
提问于 2021-01-13 17:18:48
回答 1查看 849关注 0票数 4

我有一个数据提要,每天提供一个大的.txt文件(50-75GB)。文件中包含几个不同的架构,其中每一行对应于一个架构。我想为每个模式将它分割成分区数据集,我如何有效地做到这一点?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-13 19:10:24

您需要解决的最大问题是恢复模式的迭代速度,对于这样规模的文件来说,这可能是一个挑战。

这里您的最佳策略是获得一个示例“概念”文件,其中包含您想要恢复的每一个模式,并将其作为存储库中的一个文件添加。当您将此文件添加到您的repo中(与您的转换逻辑一起)时,您将能够将它推入dataframe中,就像您使用dataset中的原始文件一样,以便快速测试迭代。

首先,确保将txt文件指定为包内容的一部分,这样您的测试就可以发现它们(这在Read a file from a Python repository下的文档中有介绍):

,您可以将其他文件从存储库读取到转换上下文中。这在设置转换代码引用的参数时可能很有用。

首先,在python存储库中编辑setup.py:

安装程序(名称=os.environ‘’PKG_NAME‘,#.package_data={ '':'*.txt‘}

我使用的是一个包含以下内容的txt文件:

代码语言:javascript
复制
my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing

此文本文件位于我的存储库中的以下路径:transforms-python/src/myproject/datasets/raw.txt

一旦配置了随逻辑一起提供的文本文件,并且在存储库中包含了该文件本身之后,您就可以包含以下代码。此代码具有以下几个重要功能:

  1. 将原始文件解析逻辑与将文件读入火花DataFrame的阶段完全分开。这样就可以将这个DataFrame的构造方式留给测试基础结构或运行时,取决于您正在运行的位置。
  2. --保持逻辑分隔--使您能够确保您想要执行的实际逐行解析是它自己的可测试函数,而不是让它完全驻留在您的DataFrame代码中,而是使用Spark-本机spark_session.read.text方法,它将比原始txt文件逐行解析快数量级。这将确保并行化的DataFrame是您所操作的,而不是在您的执行器(或者更糟的是您的驱动程序)中逐行运行的一个文件。

代码语言:javascript
复制
from transforms.api import transform, Input, Output
from pkg_resources import resource_filename


def raw_parsing_logic(raw_df):
    return raw_df


@transform(
    my_output=Output("/txt_tests/parsed_files"),
    my_input=Input("/txt_tests/dataset_of_files"),
)
def my_compute_function(my_input, my_output, ctx):
    all_files_df = None
    for file_status in my_input.filesystem().ls('**/**'):
        raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
        parsed_df = raw_parsing_logic(raw_df)
        all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
    my_output.write_dataframe(all_files_df)


def test_my_compute_function(spark_session):
    file_path = resource_filename(__name__, "raw.txt")
    raw_df = raw_parsing_logic(
      spark_session.read.text(file_path)
    )
    assert raw_df.count() > 0
    raw_columns_set = set(raw_df.columns)
    expected_columns_set = {"value"}
    assert len(raw_columns_set.intersection(expected_columns_set)) == 1

一旦启动并运行了这些代码,您的test_my_compute_function方法将非常快地迭代,以便完善您的模式恢复逻辑。这将使您在数据集的最后构建变得非常容易,但不需要任何完整构建的开销。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65706708

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档