首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用h2o mojo模型对spark集群并行化问题进行预测

使用h2o mojo模型对spark集群并行化问题进行预测
EN

Stack Overflow用户
提问于 2018-01-03 19:55:48
回答 2查看 485关注 0票数 0

我在Spark集群上使用h2o模型( mojo格式)时遇到了问题,但只有当我尝试并行运行它时,而不是当我使用collect并在驱动程序上运行它时。

由于我预测的数据帧具有超过100个特征,因此我使用以下函数将数据帧行转换为h2o的RowData格式(来自here):

代码语言:javascript
复制
def rowToRowData(df: DataFrame, row: Row): RowData = {
  val rowAsMap = row.getValuesMap[Any](df.schema.fieldNames)
  val rowData = rowAsMap.foldLeft(new RowData()) { case (rd, (k,v)) =>
    if (v != null) { rd.put(k, v.toString) }
    rd
  }
  rowData
}

然后,我导入mojo模型并创建一个easyPredictModel包装器

代码语言:javascript
复制
val mojo = MojoModel.load("/path/to/mojo.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

现在,如果我首先收集数据帧(df),则可以通过在行上映射来对其进行预测,因此可以使用以下方法:

代码语言:javascript
复制
val predictions = df.collect().map { r =>
  val rData = rowToRowData(df, r) . // convert row to RowData using function
  val prediction = easyModel.predictBinomial(rData).label
  (r.getAs[String]("id"), prediction.toInt)
  }
  .toSeq
  .toDF("id", "prediction")

但是,我希望在集群上并行执行此操作,因为最终的df太大,无法在驱动程序上收集。但是如果我没有先收集数据就尝试运行相同的代码:

代码语言:javascript
复制
val predictions = df.map { r =>
  val rData = rowToRowData(df, r)
  val prediction = easyModel.predictBinomial(rData).label
  (r.getAs[String]("id"), prediction.toInt)
}
  .toDF("id", "prediction")

我得到以下错误:

代码语言:javascript
复制
18/01/03 11:34:59 WARN TaskSetManager: Lost task 0.0 in stage 118.0 (TID 9914, 213.248.241.182, executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

因此,它看起来像是数据类型不匹配。我尝试先将数据帧转换为rdd (即df.rdd.map,但得到相同的错误),执行df.mapPartition,或将rowToData函数代码放在映射中,但到目前为止都没有起作用。

有没有关于实现这一点的最佳方法的想法?

EN

回答 2

Stack Overflow用户

发布于 2018-01-06 19:37:53

我发现了一些混乱的Spark ticket https://issues.apache.org/jira/browse/SPARK-18075,描述了与提交Spark应用程序的不同方式相关的相同问题。看一看,也许它会给你一个关于你的问题的线索。

票数 0
EN

Stack Overflow用户

发布于 2018-01-26 23:36:04

你不能打电话给prediction.toInt。返回的预测是一个元组。您需要提取该元组的第二个元素来获得级别1的实际分数。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48076913

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档