首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >利用clojure/flambo对火花进行二次雷达排序

利用clojure/flambo对火花进行二次雷达排序
EN

Stack Overflow用户
提问于 2016-07-05 10:54:21
回答 1查看 137关注 0票数 0

我有一个scala程序,在这个程序中,我实现了一个完美工作的二级排序。我写这个程序的方式是:

代码语言:javascript
复制
object rfmc {
  // Custom Key and partitioner

  case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
  class RFMCPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
    override def numPartitions: Int = partitions
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[RFMCKey]
      k.cId.hashCode() % numPartitions
    }
  }
  object RFMCKey {
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
      Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
    }
  }
  // The body of the code
  //
  //
  val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
  val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
}

我想使用clojure的DSL来实现同样的东西,名为flambo。因为我不能使用clojure编写partitioner,所以我重用了上面的代码defind,编译了它,并将它作为依赖项在我的Clojure代码中使用。

现在,我将以以下方式导入clojure代码中的partitioner和密钥:

代码语言:javascript
复制
(ns xyz
  (:import
    [package RFMCPartitioner]
    [package RFMCKey]
    )
  )

但是,当我试图通过执行RFMCKey创建(RFMCKey. cust_id r f m c)时,它会引发以下错误:

代码语言:javascript
复制
java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
    at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
    at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
    at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
    at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
    at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
    at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

我的猜测是,它找不到排序,我已经定义了除数后。但是如果它在Scala中有效,为什么它不能在Clojure中工作呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-07-11 18:43:40

所以我终于自己想出来了。基本上,我必须将我的自定义排序函数编写为一个单独的scala项目,然后在clojure中调用它。

我用这种方式编写了scala文件:

代码语言:javascript
复制
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD

case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
  override def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[RFMCKey]
    k.cId.hashCode() % numPartitions
  }
}
object RFMCKey {
  implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
  }
}

class rfmcSort {
  def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = {
    val x = a.map(v => v match {
                case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
            }).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
    x
  }
}

我将其编译为ascala项目,并以这种方式在clojure代码中使用它:

代码语言:javascript
复制
(:import [org.formcept.wisdom rfmcSort]
         [org.apache.spark.rdd.RDD])

sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions))

请注意我从我创建的sortWithRFMC对象调用rfmcSort函数的方式。这里还需要注意的一件非常重要的事情是,当您将JavaPairRDD传递给scala函数时,必须首先通过调用scala上的.rdd方法将其转换为普通的.rdd方法。然后,您必须将spark RDD转换回JavaPairRDD,以便在clojure中使用它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38201613

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档