我在逗号分隔的文件中有数据,我已经将其加载到spark数据框中:数据如下:
A B C
1 2 3
4 5 6
7 8 9我想在spark中使用pyspark将上面的数据帧转换为:
A B C
A_1 B_2 C_3
A_4 B_5 C_6
--------------然后使用pyspark将其转换为list of list:
[[ A_1 , B_2 , C_3],[A_4 , B_5 , C_6]]然后在上述数据集上使用pyspark运行FP增长算法。
我尝试过的代码如下:
from pyspark.sql.functions import col, size
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/data.csv")
names=df.schema.names然后我想到了在for循环中做一些事情:
for name in names:
-----
------在此之后,我将使用fpgrowth:
df = spark.createDataFrame([
(0, [ A_1 , B_2 , C_3]),
(1, [A_4 , B_5 , C_6]),)], ["id", "items"])
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)发布于 2019-08-12 22:14:33
这里为那些通常使用Scala的人提供了一些概念,展示了如何使用pyspark。有些不同,但肯定会学到一些东西,尽管有多少是个大问题。我自己也在zipWithIndex上学到了一些关于pyspark的知识。不管怎么说。
第一部分是将内容转换为所需的格式,可能也会导入,但保留原样:
from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import functions as f
source_df = spark.createDataFrame(
[
(1, 11, 111),
(2, 22, 222)
],
["colA", "colB", "colC"]
)
intermediate_df = (reduce(
lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
source_df.columns,
source_df
) )
allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))
result_df = result_df.select(split(col("CONCAT_COLS"), ",\s*").alias("ARRAY_COLS"))
# Add 0,1,2,3, ... with zipWithIndex, we add it at back, but that does not matter, you can move it around.
# Get new Structure, the fields (one in this case but done flexibly, plus zipWithIndex value.
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])
# Need this dict approach with pyspark, different to Scala.
rdd = result_df.rdd.zipWithIndex()
rdd1 = rdd.map(
lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
)
final_result_df = spark.createDataFrame(rdd1, schema)
final_result_df.show(truncate=False)返回:
+---------------------------+-----+
|ARRAY_COLS |index|
+---------------------------+-----+
|[colA_1, colB_11, colC_111]|0 |
|[colA_2, colB_22, colC_222]|1 |
+---------------------------+-----+第二部分是旧的zipWithIndex,如果你需要0,1,..与Scala相比,这很痛苦。
通常,在Scala中更容易解决。
不确定性能,不是foldLeft,很有趣。我觉得其实还可以。
https://stackoverflow.com/questions/57459741
复制相似问题