我有下面的表,为了执行联接,我正在为它生成一个带有rowId列的数字序列,但这会引发以下错误。我做错了什么?请帮我处理这个。
fListVec: org.apache.spark.sql.DataFrame = [features: vector]
+-----------------------------------------------------------------------------+
|features |
+-----------------------------------------------------------------------------+
|[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]|
|[0.9558040000000001,0.9843780000000002,0.545025,0.9979860000000002] |
+-----------------------------------------------------------------------------+代码:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val fListrdd = fListVec.rdd
.map{case Row(features: Vector) => features}
.zipWithIndex()
.toDF("features","rowId")
fListrdd.createOrReplaceTempView("featuresTable")
val f = spark.sql("SELECT features, rowId from featuresTable")
f.show(false)输出:
导入org.apache.spark.ml.linalg.Vector导入org.apache.spark.sql.Row org.apache.spark.SparkException:由于阶段失败而中止作业:阶段206.0中的任务0失败1次,最近的失败:阶段206.0中丢失的任务0.0 (TID1718,localhost,遗嘱执行人):$$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193),$$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193),scala.collection.Iterator$$anon$11.next(Iterator.scala:409),org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762),org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52),org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.,scala.MatchError:[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]斯卡拉:52) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)驱动程序堆栈:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594),org.apache。spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:在org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)在org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292) . 101被驱赶的原因: scala.MatchError:[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003] at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark。rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99)在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) .还有3
预期产出:
features | rowId
[2.5046410000000003,...] 0
[0.9558040000000001,...] 1发布于 2017-11-30 07:50:31
您必须在中间编写一个map函数,以便为要创建的新dataframe定义dataType
val fListrdd = fListVec.rdd
.map{case Row(features) => features}
.zipWithIndex()
.map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))
.toDF("features","rowId")其中只添加了.map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))行。
您可以更进一步,创建一个dataset。我个人推荐dataset,因为数据集是类型安全的,并且是数据格式的优化形式。
为此,您需要一个case class
case class features(features: DenseVector, rowId: Int)只需在上面的解决方案中添加features单词,就可以调用.toDS api来创建类型安全dataset。
val fListDS = fListVec.rdd
.map{case Row(features: DenseVector) => features}
.zipWithIndex()
.map(x => features(x._1.asInstanceOf[DenseVector], x._2.toInt))
.toDS发布于 2017-11-30 08:20:21
就快到了--只需要指定合适的向量类型DenseVector
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.Row
val fList = Seq(
(Seq(2.5046410000000003, 2.1487149999999997, 1.0884870000000002, 3.5877090000000003)),
(Seq(0.9558040000000001, 0.9843780000000002, 0.545025, 0.9979860000000002))
).toDF("features")
def seqToVec = udf(
(s: Seq[Double]) => new DenseVector(s.toArray)
)
val fListVec = fList.withColumn("features", seqToVec($"features"))
// fListVec: org.apache.spark.sql.DataFrame = [features: vector]
val fListrdd = fListVec.rdd.
map{ case Row(features: DenseVector) => features }.
zipWithIndex.
toDF("features", "rowId")
fListrdd.show
// +--------------------+-----+
// | features|rowId|
// +--------------------+-----+
// |[2.50464100000000...| 0|
// |[0.95580400000000...| 1|
// +--------------------+-----+https://stackoverflow.com/questions/47567271
复制相似问题