首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Spark将列名附加到列值

使用Spark将列名附加到列值
EN

Stack Overflow用户
提问于 2019-08-12 18:44:18
回答 1查看 1.6K关注 0票数 1

我在逗号分隔的文件中有数据,我已经将其加载到spark数据框中:数据如下:

代码语言:javascript
复制
  A B C
  1 2 3
  4 5 6
  7 8 9

我想在spark中使用pyspark将上面的数据帧转换为:

代码语言:javascript
复制
   A    B   C
  A_1  B_2  C_3
  A_4  B_5  C_6
  --------------

然后使用pyspark将其转换为list of list:

代码语言:javascript
复制
[[ A_1 , B_2 , C_3],[A_4 , B_5 , C_6]]

然后在上述数据集上使用pyspark运行FP增长算法。

我尝试过的代码如下:

代码语言:javascript
复制
from pyspark.sql.functions import col, size
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark import SQLContext

sqlContext = SQLContext(sc)
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/data.csv")

 names=df.schema.names

然后我想到了在for循环中做一些事情:

代码语言:javascript
复制
 for name in names:
      -----
      ------

在此之后,我将使用fpgrowth:

代码语言:javascript
复制
df = spark.createDataFrame([
    (0, [ A_1 , B_2 , C_3]),
    (1, [A_4 , B_5 , C_6]),)], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-08-12 22:14:33

这里为那些通常使用Scala的人提供了一些概念,展示了如何使用pyspark。有些不同,但肯定会学到一些东西,尽管有多少是个大问题。我自己也在zipWithIndex上学到了一些关于pyspark的知识。不管怎么说。

第一部分是将内容转换为所需的格式,可能也会导入,但保留原样:

代码语言:javascript
复制
from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import * 
from pyspark.sql import Row
from pyspark.sql import functions as f

source_df = spark.createDataFrame(
   [
    (1, 11, 111),
    (2, 22, 222)
   ],
   ["colA", "colB", "colC"]
                                 )

intermediate_df = (reduce(
                    lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
                    source_df.columns,
                    source_df
                   )     )

allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))

result_df = result_df.select(split(col("CONCAT_COLS"), ",\s*").alias("ARRAY_COLS"))

# Add 0,1,2,3, ... with zipWithIndex, we add it at back, but that does not matter, you can move it around.
# Get new Structure, the fields (one in this case but done flexibly, plus zipWithIndex value.
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])

# Need this dict approach with pyspark, different to Scala.
rdd = result_df.rdd.zipWithIndex()
rdd1 = rdd.map(
               lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
              )

final_result_df = spark.createDataFrame(rdd1, schema)
final_result_df.show(truncate=False)

返回:

代码语言:javascript
复制
 +---------------------------+-----+
 |ARRAY_COLS                 |index|
 +---------------------------+-----+
 |[colA_1, colB_11, colC_111]|0    |
 |[colA_2, colB_22, colC_222]|1    |
 +---------------------------+-----+

第二部分是旧的zipWithIndex,如果你需要0,1,..与Scala相比,这很痛苦。

通常,在Scala中更容易解决。

不确定性能,不是foldLeft,很有趣。我觉得其实还可以。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57459741

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档