首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花分选

火花分选
EN

Stack Overflow用户
提问于 2017-10-09 03:39:52
回答 2查看 2K关注 0票数 1

我要整理一下RDD。排序需要在我的记录的多个字段上,因此我需要一个自定义比较器。

我看到sortBy作为它只接受一个键。我偶然发现了http://codingjunkie.net/spark-secondary-sort/,因此使用repartitionAndSortWithinPartitions实现了同样的目标。

为什么sortBy不接受自定义比较器和排序?为什么我必须重新划分才能使用自定义比较器?

EN

回答 2

Stack Overflow用户

发布于 2017-10-09 04:09:03

Question1:这是方法sortBy签名

代码语言:javascript
复制
  /**
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }

您的RDD数据对象显然是T类型的

请注意,sortBy方法绝对有单键参数字段: f: (T) => K

它接受匿名函数,因此您可以轻松地生成自定义的可比结构,并最大限度地利用具有自己定义良好的比较器的公共数据类型。

例如,如果您的RDDInt,Int,我们称之为data,您可以这样做:

代码语言:javascript
复制
val cmp = (t: (Int, Int)) => (t._1, -t._2)
data.sortBy(cmp)

这可以很容易地实现多个领域,对吗?

这将得到一个排序的RDD与第一场上升和第二场下降。

Question2:repartitionAndSortWithinPartitions usage

这是一个特定的rdd操作符,其目标是比调用重新分区并在每个分区中进行排序更有效。

在排序之前,您的程序不需要预先重新分区,它只是在这种特殊的通用模式下进行内部优化,以获得高性能。

详情请参阅文档

票数 1
EN

Stack Overflow用户

发布于 2017-10-09 05:25:20

  • 使用mapPartitions对每个分区进行排序,例如使用.sorted
  • repartitionAndSortWithinPartitions可以在同时重新分区的同时有效地对分区进行排序。
  • sortBy来创建一个全局有序的RDD
  • RDD的sortByKey方法用于全序
  • RDD的repartitionAndSortWithinPartitions是在分区内使用排序,而不是跨分区,但不幸的是,它增加了一个额外的步骤来执行重新分区

正如Spark中所写的那样,repartitionAndSortWithinPartitions比调用重新分区并在每个分区中排序更有效--换句话说,repartitionAndSortWithinPartitions将首先根据提供的分区器重新划分数据,然后按键进行排序:

因此,首先重新分区,然后调用sortBy,给您很好的性能,您可以使用repartitionAndSortWithinPartitions实现。

添加几个排序示例,希望能有所帮助。

Ex 1

代码语言:javascript
复制
val rdd = sc.parallelize(Seq(
     |                ("math",    55),
     |                ("math",    56),
     |                ("english", 57),
     |                ("english", 58),
     |                ("science", 59),
     |                ("science", 54)))

rdd.collect()

//Default Sorting : Ascending order
val sorted1 = rdd.sortByKey()

 sorted1.collect()

 //Custom Sorting : Descending order (using implicit 'Ordering')
 {
     |    //Let us define an implicit sorting for the method sortByKey()
     |    //We have used '{' above to limit the scope of the implicit ordering
     |    implicit val sortIntegersByString = new Ordering[String] {
     |       override def compare(a: String, b: String) = {
     |          val result = a.compare(b)
     |          //We use -ve to sort the key in descending order
     |          -result
     |       }
     |    }
     |    val sorted2 = rdd.sortByKey()
     |
     |    //Result
     |    sorted2.collect()
     | }

//Default Sorting : Descending order (done using the 'ascending' flag argument)
 val sorted3 = rdd.sortByKey(false)

//Result
sorted3.collect()

结果:

代码语言:javascript
复制
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[101] at parallelize at command-1784487111427703:1
sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[104] at sortByKey at command-1784487111427703:12
sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[110] at sortByKey at command-1784487111427703:34
res28: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))

Ex 2

代码语言:javascript
复制
case class Row(var firstName: String, var lastName: String, var city: String)

var rows = List(new Row("Oscar", "Wilde", "London"),
                new Row("Otto",  "Swift", "Berlin"),
                new Row("Carl",  "Swift", "Paris"),
                new Row("Hans",  "Swift", "Dublin"),
                new Row("Hugo",  "Swift", "Sligo"))

//print ("sort by last name")
//rows.sortBy(_.lastName)


print ("sort by last name and first name")

rows.sortBy(r => (r.lastName, r.firstName))



sort by last name and first namedefined class Row
rows: List[Row] = List(Row(Oscar,Wilde,London), Row(Otto,Swift,Berlin), Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo))
res26: List[Row] = List(Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo), Row(Otto,Swift,Berlin), Row(Oscar,Wilde,London))

RDD与Dataset:

代码语言:javascript
复制
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>Seq.fill(10000)(MyRecord(util.Random.nextDouble, "xxx")))
// sort this RDD by time:
val sorted = rdd.sortBy(x => x.time)
result.count

// convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("data")
val result = sqlContext.sql("select * from data order by time")
result.count
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46638504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档