首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在简单集合上触发RDD递归操作

在简单集合上触发RDD递归操作
EN

Stack Overflow用户
提问于 2018-04-29 20:06:34
回答 1查看 473关注 0票数 1

我在RDD中有用户信息:

代码语言:javascript
复制
(Id:10, Name:bla, Adress:50, ...)

我有另一个集合,包含我们为每个用户收集的身份的连续变化。

代码语言:javascript
复制
(lastId, newId)
    (10, 43)
    (85, 90)
    (43, 50)

在本例中,我需要获得每个用户id的最后一个标识:

代码语言:javascript
复制
getFinalIdentity(10) = 50     (10 -> 43 -> 50)

有一段时间,我使用了一个包含这些标识的广播变量,并在集合中迭代以获得最终的ID。

我想出了一个解决方案,使用RDD来存储标识,并对其进行递归迭代,但它并不快,而且在我看来非常复杂。

有一个优雅和快速的方法来做这个吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-30 07:00:30

你想过图表吗?

您可以从边列表中创建一个图形,如(lastId, newId)。这样,没有传出边的节点是没有传入边缘的节点的最终id。

这可以在星火与GraphX。

下面是一个例子。它为每个Id显示链中第一个Id的ID。这意味着,对于id (1 -> 2 -> 3)的这种更改,结果将是(1, 1), (2, 1), (3, 1)

代码语言:javascript
复制
import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
  val sc = new SparkContext(conf)

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    // RDD of pairs (oldId, newId)
    val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))

    // case classes for pregel operation
    case class Value(originId: VertexId)      // vertex value
    case class Message(value: VertexId)       // message sent from one vertex to another

    // Create graph from id pairs
    val graph = Graph.fromEdgeTuples(changedIds, Value(0))

    // Initial message will be sent to all vertexes at the start
    val initialMsg = Message(0)

    // How vertex should process received message
    def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
      // Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
      if (msg.value == 0) Value(vertexId)
      // Otherwise received value is initial ID
      else Value(msg.value)
    }

    // How vertexes should send messages
    def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
      // For the triplet only single message shall be sent to destination vertex
      // Its payload is source vertex origin ID
      Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
    }

    // How incoming messages to one vertex should be merged
    def mergeMsg(msg1: Message, msg2: Message): Message = {
      // Generally for this case it's an error
      // Because one ID can't have 2 different originIDs
      msg2    // Just return any of the incoming messages
    }

    // Kick out pregel calculation
    val res = graph
      .pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)

    // Print results
    res.vertices.collect().foreach(println)
  }
}

输出:(finalId firstId)

代码语言:javascript
复制
(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50090792

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档