嗨,我已经使用顶点和边文件创建了Graph。图的大小为600 is。我正在使用Spark GraphFrames的motif功能查询此图形。我已经设置了一个用于查询图形的AWS EMR集群。
集群详情:-1主8从
主节点:
m5.xlarge
4 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB从节点:
m5.4xlarge
16 vCore, 64 GiB memory, EBS only storage
EBS Storage:256 GiB (per instance)我面临着非常高的混洗读取(3.4TB)和写入(2TB),这影响了性能,大约需要50分钟才能执行10 queries.Is,有什么方法可以减少如此高的混洗。
以下是我的spark代码:
val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()
val g:GraphFrame = GraphFrame(vertexDf, edgeDf)
//queries
val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")
q1.filter(
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows'").distinct()
.createOrReplaceTempView("q1table")
spark.sql("select a.id as a_id,a.name as a_name," +
"b.id as b_id,b.name as b_name," +
"c.id as c_id,c.name as c_name from q1table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q1")
spark.catalog.uncacheTable("q1table")
val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
q2.filter(
" a.name = 'user1' and" +
" e.name = 'user4' and" +
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows' and" +
" r3.relationship = 'knows' and" +
" r4.relationship = 'knows'").distinct()
.createOrReplaceTempView("q2table")
spark.sql("select a.id as a_id, a.name as a_name ," +
"e.id as e_id, e.name as e_name from q2table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q2")
spark.catalog.uncacheTable("q2table")
spark.stop()

发布于 2020-06-22 04:51:33
Graphframe实现的问题是,它使内部数据帧的自联接次数与您在motifs上使用的次数一样多。这意味着随着链长的增加,你会有更多的洗牌。
您可以在https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read上查看更多详细信息
我也尝试过类似的方法,并看到当链的长度大于12时,Spark开始不响应,并且与executors的连接丢失,即使我增加了资源。
如果您正在尝试这样做,我建议您使用图形数据库。
希望这能有所帮助
https://stackoverflow.com/questions/62498702
复制相似问题