首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Graphx:每轮时间成本以线性方式稳定增加

Spark Graphx:每轮时间成本以线性方式稳定增加
EN

Stack Overflow用户
提问于 2016-06-22 16:23:49
回答 1查看 146关注 0票数 0

我在迭代算法中使用了graphx api。虽然I have carefully cache/ unpersist rdd, and take care of the vertices partition num.每轮的时间成本似乎仍呈线性增长趋势。我的代码的简化版本如下,它得到了同样的问题:

代码语言:javascript
复制
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object ComputingTimeProblem extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[1]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    var graph = GraphGenerators
        .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
        .cache
    graph.vertices.take(10).foreach(println)

    val maxIter = 50

    var preGraph: Graph[Double, Int] = null
    var allTime: ArrayBuffer[Double] = ArrayBuffer()
    for (i <- 1 to maxIter) {

        val begin = System.currentTimeMillis()

        preGraph = graph

        val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
        graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
        graph.vertices.take(10)

        preGraph.unpersist()

        val end = System.currentTimeMillis()

        val duration = (end - begin) / (60 * 1000d)
        allTime += duration
        println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
            duration, graph.vertices.getNumPartitions))
    }

    graph.vertices.take(10).foreach(println)

    val avgTime = allTime.sum / allTime.size
    println(s"Average Time = ${avgTime}")

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
    timeCostDiffs
        .zipWithIndex
        .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1))
        .foreach(println)

    println("tc\n"+allTime.mkString("\n"))
}

时间成本趋势如下

我没有改变图形对象的索引,并且graphx将通过leftZipJoin方法连接顶点,这不需要洗牌,所以为什么每轮的时间成本仍然增加。有没有人能给出一些建设性的建议,谢谢?!

EN

回答 1

Stack Overflow用户

发布于 2016-06-22 21:24:25

这仍然是一个血统问题,我刚刚发现。Graph对象有两个rdd:顶点rdd和边rdd。在上面的代码中,我只是物化了顶点rdd,而不是边rdd。因此,每一轮,它都会重新计算先前的边缘。因此,使用triplets对象实体化这两个rdd将解决这个问题,如下所示:

代码语言:javascript
复制
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object ComputingTimeProblem extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[1]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    var graph = GraphGenerators
        .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
        //        .partitionBy(PartitionStrategy.RandomVertexCut,8)
        .cache
    graph.vertices.take(10).foreach(println)

    val maxIter = 50

    var preGraph: Graph[Double, Int] = null
    var allTime: ArrayBuffer[Double] = ArrayBuffer()
    for (i <- 1 to maxIter) {
        val begin = System.currentTimeMillis()

        preGraph = graph

        val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
        graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
        graph.triplets.take(10) // here materialize both vertex and edge rdd
        // graph.vertices.take(10)

        preGraph.unpersist()

        val end = System.currentTimeMillis()

        val duration = (end - begin) / (60 * 1000d)
        allTime += duration
        println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
            duration, graph.vertices.getNumPartitions))
    }

    graph.vertices.take(10).foreach(println)

    val avgTime = allTime.sum / allTime.size
    println(s"Average Time = ${avgTime}")

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
    timeCostDiffs
        .zipWithIndex
        .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2 + 1, x._2 + 2, x._1))
        .foreach(println)


    println("tc\n" + allTime.mkString("\n"))

}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37962464

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档