我使用Spark并行处理一些执行数据提取并返回熊猫数据的现有代码。我想把这些熊猫的数据转换成一个或多个火花数据。
注:现有代码非常复杂(涉及调用本机库等),因此将其直接移植到Spark代码并不是一种选择。
下面是一个简化的代码示例:
import pandas as pd
def extract_df(s):
# Lots of existing code that returns a large pandas dataframe
# ...
return pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})
sRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
dfsRDD = sRDD.map(lambda s: extract_df(s))我知道我可以通过收集驱动程序将datesRDD转换成Spark。
spark.createDataFrame(pd.concat(rdd.collect(), ignore_index=True)).show()但是,这当然要求我可以将Pandas的全部数据存储在内存中,而我不能。
目前,我正在S3上为json编写,然后使用Spark阅读,但这需要大量的存储。
有什么方法可以让斯派克在执行器本身上转换成DataFrame/RDD?还是我错过了另一种方法?
发布于 2016-10-12 13:29:34
好样的,flatMap来救你!
import pandas as pd
def extract_df(s):
# Lots of existing code that returns a **huge** pandas dataframe
# ...
df = pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})
return df.values.tolist()
datesRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
dfsRDD = datesRDD.flatMap(lambda s: extract_df(s))
spark.createDataFrame(dfsRDD, schema=['x', 'y', 'z']).show()
+---+---+---+
| x| y| z|
+---+---+---+
| A| 1| 4|
| A| 2| 5|
| A| 3| 6|
| B| 1| 4|
| B| 2| 5|
| B| 3| 6|
| C| 1| 4|
| C| 2| 5|
| C| 3| 6|
+---+---+---+https://stackoverflow.com/questions/39999121
复制相似问题