我有一个数据提要,每天提供一个大的.txt文件(50-75GB)。文件中包含几个不同的架构,其中每一行对应于一个架构。我想为每个模式将它分割成分区数据集,我如何有效地做到这一点?
发布于 2021-01-13 19:10:24
您需要解决的最大问题是恢复模式的迭代速度,对于这样规模的文件来说,这可能是一个挑战。
这里您的最佳策略是获得一个示例“概念”文件,其中包含您想要恢复的每一个模式,并将其作为存储库中的一个文件添加。当您将此文件添加到您的repo中(与您的转换逻辑一起)时,您将能够将它推入dataframe中,就像您使用dataset中的原始文件一样,以便快速测试迭代。
首先,确保将txt文件指定为包内容的一部分,这样您的测试就可以发现它们(这在Read a file from a Python repository下的文档中有介绍):
,您可以将其他文件从存储库读取到转换上下文中。这在设置转换代码引用的参数时可能很有用。
首先,在python存储库中编辑setup.py:
安装程序(名称=os.environ‘’PKG_NAME‘,#.package_data={ '':'*.txt‘}
我使用的是一个包含以下内容的txt文件:
my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing此文本文件位于我的存储库中的以下路径:transforms-python/src/myproject/datasets/raw.txt
一旦配置了随逻辑一起提供的文本文件,并且在存储库中包含了该文件本身之后,您就可以包含以下代码。此代码具有以下几个重要功能:
spark_session.read.text方法,它将比原始txt文件逐行解析快数量级。这将确保并行化的DataFrame是您所操作的,而不是在您的执行器(或者更糟的是您的驱动程序)中逐行运行的一个文件。from transforms.api import transform, Input, Output
from pkg_resources import resource_filename
def raw_parsing_logic(raw_df):
return raw_df
@transform(
my_output=Output("/txt_tests/parsed_files"),
my_input=Input("/txt_tests/dataset_of_files"),
)
def my_compute_function(my_input, my_output, ctx):
all_files_df = None
for file_status in my_input.filesystem().ls('**/**'):
raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
parsed_df = raw_parsing_logic(raw_df)
all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
my_output.write_dataframe(all_files_df)
def test_my_compute_function(spark_session):
file_path = resource_filename(__name__, "raw.txt")
raw_df = raw_parsing_logic(
spark_session.read.text(file_path)
)
assert raw_df.count() > 0
raw_columns_set = set(raw_df.columns)
expected_columns_set = {"value"}
assert len(raw_columns_set.intersection(expected_columns_set)) == 1一旦启动并运行了这些代码,您的test_my_compute_function方法将非常快地迭代,以便完善您的模式恢复逻辑。这将使您在数据集的最后构建变得非常容易,但不需要任何完整构建的开销。
https://stackoverflow.com/questions/65706708
复制相似问题