我在中有一些JSON文件,其中包含大量数据(介于500GiB和1TiB之间)。这些文件每行包含1个JSON对象,格式如下:
{"country":"US", "col1":"val1", "col2":"val2", "col3":"val3"}
{"country":"CA", "col1":"val4", "col2":"val5", "col3":"val6"}我的目标是在BigQuery中为我可以在这些数据中找到的10个国家制作不同的表格。因此,我将得到10个表,例如,其中一个将命名为data_us,其模式为:col1,col2,col3。
我目前的做法是使用PySpark并在Google上的机器集群上运行作业:
data = spark.read.json(bucket_source)
data.createOrReplaceTempView('data')
for c in country_list:
table_name = "data_{}".format(c)
query = "select col1, col2, col3, from data where language = '{}'".format(c)
result_folder = "result_{}".format(c)
result = spark.sql(query)
push_bigquery(bucket_dest, cluster_name, project_name, dataset_name, result, result_folder, table_name)基本上,我只是加载数据,创建一个视图,并要求PySpark为每个国家运行1个请求。然后我调用push_bigquery函数,它只是将结果转储到CSV文件并将它们加载到BigQuery中。这个解决方案可以工作,但是对于大量的数据(对于数据大小接近1 1TiB的大约12小时),它看起来有点慢。
我有两个问题:
完全不同的更好的方法来完成这个任务?
谢谢你的帮助
发布于 2020-05-10 10:47:07
我错过了.cache,但以下是基于N country <-> N表要求的第一次尝试:
df.repartition(country).write...partitionBy(country)...读取、重新划分和写出非拼板(不需要柱状)形式,并提供适当的选项。然后,根据分区意识,发布于 2020-05-10 12:50:52
您可以使用@theBlue幻象提供的代码对其进行优化,并通过这样做进行更多的优化。
df.repartition(country).write...partitionBy(country)在保存数据后,现在您将拥有每个国家的单独文件夹。因此,在运行bq命令时,可以在这些文件夹上创建国家级表。这样,您就不需要做任何进一步的处理了,您将把数据从所有大型查询表中分离出来。
https://stackoverflow.com/questions/61710185
复制相似问题