首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >预排序输入的火花特征向量变换

预排序输入的火花特征向量变换
EN

Stack Overflow用户
提问于 2016-01-26 18:13:53
回答 1查看 268关注 0票数 1

在HDFS上的一个标签分隔文件中有一些数据,如下所示:

代码语言:javascript
复制
label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

我需要对其进行转换,为每个user_id创建一个特征向量,以训练一个org.apache.spark.ml.classification.NaiveBayes模型。

我目前的做法基本上如下:

  1. 将原始数据加载到DataFrame中
  2. 用StringIndexer索引特性
  3. 通过user_id转到RDD和Group,并将特征索引映射到稀疏向量中。

关键是这个..。数据已经由user_id预排序。什么是最好的方法来利用它?想到可能会发生多少不必要的工作,我感到很痛苦。

如果一些代码有助于理解我当前的方法,下面是映射的本质:

代码语言:javascript
复制
val featurization = (vals: (String,Iterable[Row])) => {
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq

  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)

  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0

  (label, vals._1, featureVector)
}

d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-01-28 00:13:38

让我们从your other question开始

如果我在磁盘上的数据保证按键预排序,该键将用于组聚合或减少,那么是否有任何方法可以让星火利用这一点呢?

那得看情况。如果您应用的操作可以从映射端聚合中受益,那么您可以通过预置数据获得相当多的收益,而无需对代码进行任何进一步的干预。共享相同密钥的数据应该位于相同的分区上,并且可以在洗牌前在本地进行聚合。

不幸的是,在这种特殊情况下,它不会有多大帮助。即使启用了映射端聚合(groupBy(Key)不使用is,因此需要自定义实现)或功能向量上的聚合(在我对How to define a custom aggregation function to sum a column of Vectors?的答复中可以找到一些示例),也没有什么收获。您可以在任何地方保存一些工作,但仍然必须在节点之间传输所有索引。

如果你想得到更多,你就得做更多的工作。我可以看到利用现有订单的两种基本方法:

  1. 使用自定义Hadoop输入格式只生成完整的记录(标签、id、所有功能),而不是逐行读取数据。如果您的数据每个id有固定的行数,您甚至可以尝试使用NLineInputFormat,然后将mapPartitions应用于聚合记录。 这无疑是更详细的解决方案,但不需要额外的洗牌在星火。
  2. 像往常一样读取数据,但对groupBy使用自定义分区程序。据我所知,使用rangePartitioner应该工作得很好,但为了确保您可以尝试以下过程:
代码语言:javascript
复制
- use `mapPartitionsWithIndex` to find minimum / maximum id per partition. 
- create partitioner which keeps  minimum <= ids < maximum on the current (_i-th_) partition and pushes maximum to the partition _i + 1_
- use this partitioner for `groupBy(Key)`

这可能是更友好的解决方案,但至少需要一些洗牌。如果预计要移动的记录数量很低(<< # records -每个分区),您甚至可以不用使用mapPartitionsbroadcast*来处理这个问题,尽管在实践中进行分区可能更有用,也更便宜。

*您可以使用类似于此的方法:https://stackoverflow.com/a/33072089/1560062

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35020895

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档