我正在尝试对一个40 in大小的文件对pandas执行pivot_table操作。
这需要很多时间来运行(1.5小时)。我在hadoop集群上执行此操作,方法是将spark数据帧转换为pandas并使用pyarrow。但是使用pandas,我相信这个操作只在那个特定的节点上运行。最后,我再次将其转换为spark数据帧。
输入:
+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+
| uniqueid | Measure1_month1 | Measure1_month2 | .... | Measure1_month72 | Measure2_month_1 | Measure2_month_2 | ….so on | Measure2_month72 |
+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+
| 1 | 10 | 20 | …. | 500 | 40 | 50 | … | |
| 2 | 20 | 40 | …. | 800 | 70 | 150 | … | |
+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+输出:
+---------+-------+----------+----------+
| uniqueid| Month | Measure1 | Measure2 |
+---------+-------+----------+----------+
| 1 | 1 | 10 | 30 |
| 1 | 2 | 20 | 40 |
| 1 | 3 | 30 | 80 |
| 1 | 4 | 70 | 90 |
| 1 | 5 | 40 | 100 |
| . | . | . | . |
| . | . | . | . |
| 1 | 72 | 700 | 50 |
+---------+-------+----------+----------+代码:
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df_input = spark.sql("select * from inputtable")
df_input_pd=df_input.toPandas()
d = df_input_pd.set_index('uniqueid')
d.columns = d.columns.str.replace('m\_', 'm').str.split('_', expand=True)
u = d.stack((0, 1)).rename_axis(
['uniqueid', 'Measure', 'Month']).to_frame('Value').reset_index()
f2 = u.pivot_table(index=['uniqueid', 'Month'], columns='Measure', values='Value', fill_value=0).sort_values(['uniqueid', 'Month'])
spDF = sqlContext.createDataFrame(f2)
spDF.write.insertInto("outputtable",overwrite=False)f2(pivot)和insertinto是耗时最长的操作。有没有办法通过使用pyspark pivot或任何其他操作来优化它?我在一组较小的数据上测试了它,它工作得很好。有没有更好的方法来做到这一点呢?如果有什么不清楚的地方,请告诉我。再次感谢
向您致敬,萨万
发布于 2020-10-08 03:51:01
远离熊猫,使用spark pivot功能:
df.pivot('Measure')*发布一个数据样本,以及您期望的最终结果。
https://stackoverflow.com/questions/64250726
复制相似问题