我在RDD中有用户信息:
(Id:10, Name:bla, Adress:50, ...)我有另一个集合,包含我们为每个用户收集的身份的连续变化。
(lastId, newId)
(10, 43)
(85, 90)
(43, 50)在本例中,我需要获得每个用户id的最后一个标识:
getFinalIdentity(10) = 50 (10 -> 43 -> 50)有一段时间,我使用了一个包含这些标识的广播变量,并在集合中迭代以获得最终的ID。
我想出了一个解决方案,使用RDD来存储标识,并对其进行递归迭代,但它并不快,而且在我看来非常复杂。
有一个优雅和快速的方法来做这个吗?
发布于 2018-04-30 07:00:30
你想过图表吗?
您可以从边列表中创建一个图形,如(lastId, newId)。这样,没有传出边的节点是没有传入边缘的节点的最终id。
这可以在星火与GraphX。
下面是一个例子。它为每个Id显示链中第一个Id的ID。这意味着,对于id (1 -> 2 -> 3)的这种更改,结果将是(1, 1), (2, 1), (3, 1)。
import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
// RDD of pairs (oldId, newId)
val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))
// case classes for pregel operation
case class Value(originId: VertexId) // vertex value
case class Message(value: VertexId) // message sent from one vertex to another
// Create graph from id pairs
val graph = Graph.fromEdgeTuples(changedIds, Value(0))
// Initial message will be sent to all vertexes at the start
val initialMsg = Message(0)
// How vertex should process received message
def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
// Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
if (msg.value == 0) Value(vertexId)
// Otherwise received value is initial ID
else Value(msg.value)
}
// How vertexes should send messages
def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
// For the triplet only single message shall be sent to destination vertex
// Its payload is source vertex origin ID
Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
}
// How incoming messages to one vertex should be merged
def mergeMsg(msg1: Message, msg2: Message): Message = {
// Generally for this case it's an error
// Because one ID can't have 2 different originIDs
msg2 // Just return any of the incoming messages
}
// Kick out pregel calculation
val res = graph
.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)
// Print results
res.vertices.collect().foreach(println)
}
}输出:(finalId firstId)
(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))https://stackoverflow.com/questions/50090792
复制相似问题