假设一个数据帧有两列: C1和C2
+---+-----+
|C1 | C2 |
+---+-----+
|A | B |
|C | D |
|A | E |
|E | F |
+---+-----+我的目标是:在数组交集中收集
+--------------+
| intersections|
+--------------+
|[A, B, E, F] |
|[C, D] |
+--------------+如果数据帧有大量的行数(大约10亿行),如何做好呢?
发布于 2021-09-23 14:52:39
解决方案是GraphFrame库(https://graphframes.github.io/graphframes/docs/_site/index.html)
免责声明:使用Spark 2.4.4和GraphFrame 0.7.0进行测试
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import scala.collection._
import org.graphframes.GraphFrame
object SparkApp extends App {
val appName = "appName"
val master = "local[*]"
val spark = SparkSession
.builder
.appName(appName)
.master(master)
.getOrCreate
import spark.implicits._
val dataTest =
Seq(
("A", "B"),
("C", "D"),
("A", "E"),
("E", "F")
).toDF("C1", "C2")
// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")
// dataframe to list of vertices and connections list
val graphTest: GraphFrame =
GraphFrame(
dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct,
dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
)
val graphComponentsTest = graphTest.connectedComponents.run()
val clustersResultTestDF =
graphComponentsTest
.groupBy("component")
.agg(collect_list("id") as "intersections")
clustersResultTestDF.show
}输出为
+--------------+
| intersections|
+--------------+
|[A, B, E, F] |
|[C, D] |
+--------------+https://stackoverflow.com/questions/69213835
复制相似问题