首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火列智慧字数

星火列智慧字数
EN

Stack Overflow用户
提问于 2015-01-27 08:36:41
回答 2查看 3.8K关注 0票数 3

我们正在尝试为我们的数据集生成按列排列的统计信息。除了使用统计库中的汇总函数之外。我们正在使用以下程序:

  1. 我们用字符串值确定列。
  2. 使用列号作为键和列的值为值,为整个数据集生成键值对。
  3. 生成新的格式地图 (K,V) ->((K,V),1)

然后,我们使用reduceByKey在所有列中找到所有唯一值的和。我们缓存这个输出以减少进一步的计算时间。

在接下来的步骤中,我们使用for循环循环遍历列,以查找所有列的统计信息。

我们试图通过再次利用映射减少方法来减少for循环,但是我们无法找到实现它的方法。这样做将允许我们为一次执行中的所有列生成列统计信息。for循环方法是按顺序运行的,因此非常慢。

代码:

代码语言:javascript
复制
//drops the header

    def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

    def retAtrTuple(x: String) = {
       val newX = x.split(",")
       for (h <- 0 until newX.length) 
          yield (h,newX(h))
    }



    val line = sc.textFile("hdfs://.../myfile.csv")

    val withoutHeader: RDD[String] = dropHeader(line)

    val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value


    var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)

    var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
    var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}

    var str_col = str_cols.toArray   //array consisting the string col
    var num_col = num_cols.toArray   //array consisting numeric col


    val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
    val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
    var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }

    //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
    for(i <- str_col){
       var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
       var leastOnes = total.take(10)
       println("leastOnes for Col" + i)
       leastOnes.foreach(println)
       var maxOnes = total.sortBy(-_._2._2).take(10)
       println("maxOnes for Col" + i)
       maxOnes.foreach(println)
       println("distinct for Col" + i + " is " + total.count)
    }
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-01-28 18:05:07

让我把你的问题简化一点。(其实很多。)我们有一个RDD[(Int, String)],我们希望为每个Int找到前10个最常见的String(它们都在0-100范围内)。

与您的示例中的排序不同,使用Spark内置的RDD.top(n)方法更有效。它的运行时间与数据的大小成线性关系,需要移动的数据比一种数据少得多。

考虑一下topRDD.scala中的实现。您希望这样做,但是每个Int键都有一个优先级队列(堆)。代码变得相当复杂:

代码语言:javascript
复制
import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.

def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
  // A heap that only keeps the top N values, so it has bounded size.
  type Heap = BoundedPriorityQueue[(Long, String)]
  // Get the word counts.
  val counts: RDD[[(Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
  // In each partition create a column -> heap map.
  val perPartition: RDD[Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for (((k, v), count) <- items) {
        heaps(k) += count -> v
      }
      Iterator.single(heaps)
    }
  // Merge the per-partition heap maps into one.
  val merged: Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
  // Discard counts, return just the top strings.
  merged.mapValues(_.map { case(count, value) => value })
}

这是有效的,但却让人痛苦,因为我们需要同时处理多个列。每列都有一个RDD,只需在每个列上调用rdd.top(10)就会容易得多。

不幸的是,将RDD划分为N个较小的RDD的天真方法通过了N个方法:

代码语言:javascript
复制
def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
  together.cache // We will make N passes over this RDD.
  (0 until columns).map {
    i => together.filter { case (key, value) => key == i }.values
  }
}

更有效的解决方案可能是按键将数据写入单独的文件中,然后将其加载到单独的RDD中。这是在按键火花写入多个输出-一个火花作业中讨论的。

票数 2
EN

Stack Overflow用户

发布于 2018-02-11 11:55:32

谢谢你丹尼尔·达博斯的回答。但也有一些错误。

  1. 地图与collection.mutable.Map的混合使用
  2. withDefault((i: Int) =>新堆(N))在设置堆(K) += count -> v时不创建新堆
  3. 括号的混合用法

以下是修改后的代码:

代码语言:javascript
复制
//import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object BoundedPriorityQueueTest {

  //  https://stackoverflow.com/questions/28166190/spark-column-wise-word-count
  def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
    // A heap that only keeps the top N values, so it has bounded size.
    type Heap = BoundedPriorityQueue[(Long, String)]
    // Get the word counts.
    val counts: RDD[((Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
    // In each partition create a column -> heap map.
    val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
      for (((k, v), count) <- items) {
        println("\n---")
        println("before add " + ((k, v), count) + ", the map is: ")
        println(heaps)
        if (!heaps.contains(k)) {
          println("not contains key " + k)
          heaps(k) = new Heap(n)
          println(heaps)
        }
        heaps(k) += count -> v
        println("after add " + ((k, v), count) + ", the map is: ")
        println(heaps)

      }
      println(heaps)
      Iterator.single(heaps)
    }
    // Merge the per-partition heap maps into one.
    val merged: collection.mutable.Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
      println(heaps)
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
    // Discard counts, return just the top strings.
    merged.mapValues(_.map { case (count, value) => value }).toMap
  }

  def main(args: Array[String]): Unit = {
    Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console


    val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
    println("# words:" + words.count())

    val result = top(1, words)

    println("\n--result:")
    println(result)
    sc.stop()

    print("DONE")
  }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28166190

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档