首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >生成向量数据序列时的错误

生成向量数据序列时的错误
EN

Stack Overflow用户
提问于 2017-11-30 06:36:14
回答 2查看 447关注 0票数 1

我有下面的表,为了执行联接,我正在为它生成一个带有rowId列的数字序列,但这会引发以下错误。我做错了什么?请帮我处理这个。

代码语言:javascript
复制
fListVec: org.apache.spark.sql.DataFrame = [features: vector]
+-----------------------------------------------------------------------------+
|features                                                                     |
+-----------------------------------------------------------------------------+
|[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]|
|[0.9558040000000001,0.9843780000000002,0.545025,0.9979860000000002]          |
+-----------------------------------------------------------------------------+

代码:

代码语言:javascript
复制
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val fListrdd = fListVec.rdd
    .map{case Row(features: Vector) => features}
    .zipWithIndex()
    .toDF("features","rowId")    

fListrdd.createOrReplaceTempView("featuresTable")
val f = spark.sql("SELECT features, rowId from featuresTable")
f.show(false)

输出:

导入org.apache.spark.ml.linalg.Vector导入org.apache.spark.sql.Row org.apache.spark.SparkException:由于阶段失败而中止作业:阶段206.0中的任务0失败1次,最近的失败:阶段206.0中丢失的任务0.0 (TID1718,localhost,遗嘱执行人):$$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193),$$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193),scala.collection.Iterator$$anon$11.next(Iterator.scala:409),org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762),org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52),org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.,scala.MatchError:[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]斯卡拉:52) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)驱动程序堆栈:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605),org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594),org.apache。spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:在org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)在org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292) . 101被驱赶的原因: scala.MatchError:[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003] at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark。rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99)在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) .还有3

预期产出:

代码语言:javascript
复制
 features                 |       rowId

[2.5046410000000003,...]            0
[0.9558040000000001,...]            1
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-11-30 07:50:31

您必须在中间编写一个map函数,以便为要创建的新dataframe定义dataType

代码语言:javascript
复制
val fListrdd = fListVec.rdd
  .map{case Row(features) => features}
  .zipWithIndex()
  .map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDF("features","rowId")

其中只添加了.map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))行。

您可以更进一步,创建一个dataset。我个人推荐dataset,因为数据集是类型安全的,并且是数据格式的优化形式。

为此,您需要一个case class

代码语言:javascript
复制
case class features(features: DenseVector, rowId: Int)

只需在上面的解决方案中添加features单词,就可以调用.toDS api来创建类型安全dataset

代码语言:javascript
复制
val fListDS = fListVec.rdd
  .map{case Row(features: DenseVector) => features}
  .zipWithIndex()
  .map(x => features(x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDS
票数 2
EN

Stack Overflow用户

发布于 2017-11-30 08:20:21

就快到了--只需要指定合适的向量类型DenseVector

代码语言:javascript
复制
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.Row

val fList = Seq(
  (Seq(2.5046410000000003, 2.1487149999999997, 1.0884870000000002, 3.5877090000000003)),
  (Seq(0.9558040000000001, 0.9843780000000002, 0.545025, 0.9979860000000002))
).toDF("features")

def seqToVec = udf(
  (s: Seq[Double]) => new DenseVector(s.toArray)
)

val fListVec = fList.withColumn("features", seqToVec($"features"))
// fListVec: org.apache.spark.sql.DataFrame = [features: vector]

val fListrdd = fListVec.rdd.
  map{ case Row(features: DenseVector) => features }.
  zipWithIndex.
  toDF("features", "rowId")  

fListrdd.show
// +--------------------+-----+
// |            features|rowId|
// +--------------------+-----+
// |[2.50464100000000...|    0|
// |[0.95580400000000...|    1|
// +--------------------+-----+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47567271

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档