我在迭代算法中使用了graphx api。虽然I have carefully cache/ unpersist rdd, and take care of the vertices partition num.每轮的时间成本似乎仍呈线性增长趋势。我的代码的简化版本如下,它得到了同样的问题:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object ComputingTimeProblem extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[1]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var graph = GraphGenerators
.logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
.cache
graph.vertices.take(10).foreach(println)
val maxIter = 50
var preGraph: Graph[Double, Int] = null
var allTime: ArrayBuffer[Double] = ArrayBuffer()
for (i <- 1 to maxIter) {
val begin = System.currentTimeMillis()
preGraph = graph
val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
graph.vertices.take(10)
preGraph.unpersist()
val end = System.currentTimeMillis()
val duration = (end - begin) / (60 * 1000d)
allTime += duration
println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
duration, graph.vertices.getNumPartitions))
}
graph.vertices.take(10).foreach(println)
val avgTime = allTime.sum / allTime.size
println(s"Average Time = ${avgTime}")
val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
timeCostDiffs
.zipWithIndex
.map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1))
.foreach(println)
println("tc\n"+allTime.mkString("\n"))
}时间成本趋势如下

我没有改变图形对象的索引,并且graphx将通过leftZipJoin方法连接顶点,这不需要洗牌,所以为什么每轮的时间成本仍然增加。有没有人能给出一些建设性的建议,谢谢?!
发布于 2016-06-22 21:24:25
这仍然是一个血统问题,我刚刚发现。Graph对象有两个rdd:顶点rdd和边rdd。在上面的代码中,我只是物化了顶点rdd,而不是边rdd。因此,每一轮,它都会重新计算先前的边缘。因此,使用triplets对象实体化这两个rdd将解决这个问题,如下所示:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object ComputingTimeProblem extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[1]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var graph = GraphGenerators
.logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
// .partitionBy(PartitionStrategy.RandomVertexCut,8)
.cache
graph.vertices.take(10).foreach(println)
val maxIter = 50
var preGraph: Graph[Double, Int] = null
var allTime: ArrayBuffer[Double] = ArrayBuffer()
for (i <- 1 to maxIter) {
val begin = System.currentTimeMillis()
preGraph = graph
val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
graph.triplets.take(10) // here materialize both vertex and edge rdd
// graph.vertices.take(10)
preGraph.unpersist()
val end = System.currentTimeMillis()
val duration = (end - begin) / (60 * 1000d)
allTime += duration
println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
duration, graph.vertices.getNumPartitions))
}
graph.vertices.take(10).foreach(println)
val avgTime = allTime.sum / allTime.size
println(s"Average Time = ${avgTime}")
val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
timeCostDiffs
.zipWithIndex
.map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2 + 1, x._2 + 2, x._1))
.foreach(println)
println("tc\n" + allTime.mkString("\n"))
}https://stackoverflow.com/questions/37962464
复制相似问题