首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >对Spark/Graphx/Pregel示例程序中的停止条件感到困惑,以便找到路径距离

对Spark/Graphx/Pregel示例程序中的停止条件感到困惑,以便找到路径距离
EN

Stack Overflow用户
提问于 2019-05-22 10:30:15
回答 1查看 387关注 0票数 1

我正在学习Graphx In Action,这本书(源代码在这里:https://github.com/insidedctm/spark-graphx-in-action)讨论了两种计算树根和所有节点之间距离(边跳数)的方法。我理解使用aggregateMessages提供的代码示例。特别是,停止条件是有意义的(我已经通过下面的注释突出显示了该条件,其中包括文本“停止条件”)。一旦图的顶点上的属性停止更改,继续运行算法就没有意义了。

当我看着Pregel计算相同结果的方式时,我有点困惑(如下所示)。

特别是当调用Pregel的apply方法时,maxIterations是缺省值Integer.MAX_VALUE (实际上它是‘永远运行’)。因此,看起来'sendMsg‘函数是:

代码语言:javascript
复制
               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),

将被无限调用,即使在顶点上的值收敛之后也是如此。

是否有一些我忽略的机制导致程序在收敛后停止?

代码语言:javascript
复制
// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala

def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
  ec.sendToDst(ec.srcAttr+1)
}

def mergeMsg(a: Int, b: Int): Int = {
  math.max(a,b)
}

def propagateEdgeCount(g:Graph[Int,String])
 :Graph[Int,String] = {    
  val verts = 
        g.aggregateMessages[Int](sendMsg, mergeMsg)
  val g2 = 
        Graph(verts, g.edges)
  val check = 
        g2.vertices.join(g.vertices).
           map(x => x._2._1 – x._2._2).
           reduce(_ + _)

  // STOP CONDITION
  // check here ensures stop if nothing changed  (******)
  if (check > 0)            
    propagateEdgeCount(g2)
  else
    g
}

// Pregel approach

val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
               activeDirection = EdgeDirection.Out)(
               (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),
               (a:Int,b:Int) => math.max(a,b))
g.vertices.collect
EN

回答 1

Stack Overflow用户

发布于 2019-12-25 18:05:49

据我所知,如果所有的节点都停止工作,那么pregel就会自动停止工作。

  有两种方法可以停止所有节点,这两种方法可以通过所有节点的属性不变来实现:

  • 1.给出一个发送消息的条件,换句话说,如果给定的条件为假,节点将停止向message.
  • 2.Give发送一个函数,即所有节点在多次迭代后都将停止,也就是说,虽然发送消息的条件仍然为真,但所有节点的属性都不变。

val bfs2 = initialGraph2.pregel(Double.PositiveInfinity)( (id,attr,msg) => math.min(attr,msg),三重=> { if (triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr == Double.PositiveInfinity) {Iterator((triplet.dstId,triplet.srcAttr+1))} else {Iterator.empty}},(a,b) => math.min(a,b ) .cache()

"triplet.dstAttr == Double.PositiveInfinity"是继续条件。

如果所有节点都小于Double.PositiveInfinity,则发送消息操作将停止,显然,所有节点都将停止。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56248530

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档