所以,我在Spark中有一个DataFrame,它看起来像这样:
它有30列:只显示其中的一部分!
[ABCD,color,NORMAL,N,2015-02-20,1]
[XYZA,color,NORMAL,N,2015-05-04,1]
[GFFD,color,NORMAL,N,2015-07-03,1]
[NAAS,color,NORMAL,N,2015-08-26,1]
[LOWW,color,NORMAL,N,2015-09-26,1]
[KARA,color,NORMAL,N,2015-11-08,1]
[ALEQ,color,NORMAL,N,2015-12-04,1]
[VDDE,size,NORMAL,N,2015-12-23,1]
[QWER,color,NORMAL,N,2016-01-18,1]
[KDSS,color,NORMAL,Y,2015-08-29,1]
[KSDS,color,NORMAL,Y,2015-08-29,1]
[ADSS,color,NORMAL,Y,2015-08-29,1]
[BDSS,runn,NORMAL,Y,2015-08-29,1]
[EDSS,color,NORMAL,Y,2015-08-29,1]因此,我必须在Scala中将该dataFrame转换为键值对,使用该键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同数量的键)的唯一值。
例如:使用上面的例子,我希望在Scala中的map(键-值)集合中有一个输出,如下所示:
([ABC_color_NORMAL_N_1->0]
[XYZA_color_NORMAL_N_1->1]
[GFFD_color_NORMAL_N_1->2]
[NAAS_color_NORMAL_N_1->3]
[LOWW_color_NORMAL_N_1->4]
[KARA_color_NORMAL_N_1->5]
[ALEQ_color_NORMAL_N_1->6]
[VDDE_size_NORMAL_N_1->7]
[QWER_color_NORMAL_N_1->8]
[KDSS_color_NORMAL_Y_1->9]
[KSDS_color_NORMAL_Y_1->10]
[ADSS_color_NORMAL_Y_1->11]
[BDSS_runn_NORMAL_Y_1->12]
[EDSS_color_NORMAL_Y_1->13]
)我是Scala和Spark的新手,我尝试过这样做。
var map: Map[String, Int] = Map()
var i = 0
dataframe.foreach( record =>{
//Is there a better way of creating a key!
val key = record(0) + record(1) + record(2) + record(3)
var index = i
map += (key -> index)
i+=1
}
)但是,这不起作用。:/此操作完成后,映射为空。
发布于 2016-03-27 04:01:57
代码中的主要问题是试图在workers上执行的代码中修改在驱动程序端创建的变量。当使用Spark时,您只能在RDD转换中使用驱动器端变量作为“只读”值。
具体地说:
foreach完成时被丢弃-结果不会发送回驱动程序。要解决这个问题-您应该选择一个转换,返回一个更改过的RDD (例如map)来创建密钥,使用zipWithIndex添加正在运行的"ids",然后使用collectAsMap将所有数据作为映射返回给驱动程序:
val result: Map[String, Long] = dataframe
.map(record => record(0) + record(1) + record(2) + record(3))
.zipWithIndex()
.collectAsMap()至于键创建本身-假设您想要包括前5列,并在它们之间添加分隔符(_),您可以使用:
record => record.toList.take(5).mkString("_")https://stackoverflow.com/questions/36239791
复制相似问题