比方说,我有两个电火花数据格式,users和shops。下面显示了这两个数据的几行示例。
用户数据:
+---------+-------------+---------+
| idvalue | day-of-week | geohash |
+---------+-------------+---------+
| id-1 | 2 | gcutjjn |
| id-1 | 3 | gcutjjn |
| id-1 | 5 | gcutjht |
+---------+-------------+---------+商店数据
+---------+-----------+---------+
| shop-id | shop-name | geohash |
+---------+-----------+---------+
| sid-1 | kfc | gcutjjn |
| sid-2 | mcd | gcutjhq |
| sid-3 | starbucks | gcutjht |
+---------+-----------+---------+我需要在Geo散列上加入这两种数据格式。当然,我可以做一个简单的equi连接,但是用户的dataframe是巨大的,包含了数十亿行,在idvalue内部和之间,geohashes可能会重复。因此,我想知道是否有一种方法可以对用户中的唯一地理哈希( dataframe )和商店中的geohashes执行连接( dataframe )。如果我们能够做到这一点,那么就很容易复制商店条目,以匹配由此生成的dataframe中的geohashes。
它可能可以通过熊猫udf实现,我将在users.idvalue上执行groupby,只从组中获取第一行(因为组内的所有I都是相同的),并创建一行数据,与udf内的商店进行连接。从逻辑上看,这是应该工作的,但在性能方面不确定,因为udf(s)通常比触发本机转换慢。任何想法都欢迎。
发布于 2019-10-16 14:36:59
你说过你的用户数据是巨大的,并且“地理哈希很可能在idvalue内部和之间重复”。但是,如果在您的商店中可能存在重复的geohashes,则没有提到dataframe。
如果在后者中没有重复的散列,我认为简单的连接将解决您的问题:
val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht")).toDF("shop_id","shop_name","geohash")
userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
| id-1| 2|gcutjjn|
| id-2| 2|gcutjjn|
| id-1| 3|gcutjjn|
| id-1| 5|gcutjht|
+-------+-----------+-------+
shopDf.show
+-------+---------+-------+
|shop_id|shop_name|geohash|
+-------+---------+-------+
| sid-1| kfc|gcutjjn|
| sid-2| mcd|gcutjhq|
| sid-3|starbucks|gcutjht|
+-------+---------+-------+
shopDf
.join(userDf,Seq("geohash"),"inner")
.groupBy($"geohash",$"shop_id",$"idvalue")
.agg(collect_list($"day_of_week").alias("days"))
.show
+-------+-------+-------+------+
|geohash|shop_id|idvalue| days|
+-------+-------+-------+------+
|gcutjjn| sid-1| id-1|[2, 3]|
|gcutjht| sid-3| id-1| [5]|
|gcutjjn| sid-1| id-2| [2]|
+-------+-------+-------+------+如果您有重复的散列值在您的商店数据same中,一种可能的方法是从您的商店dataframe中删除那些重复的散列(如果您的需求允许的话),然后执行相同的联接操作。
val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht"),("sid-4","burguer king","gcutjjn")).toDF("shop_id","shop_name","geohash")
userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
| id-1| 2|gcutjjn|
| id-2| 2|gcutjjn|
| id-1| 3|gcutjjn|
| id-1| 5|gcutjht|
+-------+-----------+-------+
shopDf.show
+-------+------------+-------+
|shop_id| shop_name|geohash|
+-------+------------+-------+
| sid-1| kfc|gcutjjn| << Duplicated geohash
| sid-2| mcd|gcutjhq|
| sid-3| starbucks|gcutjht|
| sid-4|burguer king|gcutjjn| << Duplicated geohash
+-------+------------+-------+
//Dataframe with hashes to exclude:
val excludedHashes = shopDf.groupBy("geohash").count.filter("count > 1")
excludedHashes.show
+-------+-----+
|geohash|count|
+-------+-----+
|gcutjjn| 2|
+-------+-----+
//Create a dataframe of shops without the ones with duplicated hashes
val cleanShopDf = shopDf.join(excludedHashes,Seq("geohash"),"left_anti")
cleanShopDf.show
+-------+-------+---------+
|geohash|shop_id|shop_name|
+-------+-------+---------+
|gcutjhq| sid-2| mcd|
|gcutjht| sid-3|starbucks|
+-------+-------+---------+
//Perform the same join operation
cleanShopDf.join(userDf,Seq("geohash"),"inner")
.groupBy($"geohash",$"shop_id",$"idvalue")
.agg(collect_list($"day_of_week").alias("days"))
.show
+-------+-------+-------+----+
|geohash|shop_id|idvalue|days|
+-------+-------+-------+----+
|gcutjht| sid-3| id-1| [5]|
+-------+-------+-------+----+提供的代码是用Scala编写的,但可以很容易地转换为Python。
希望这能有所帮助!
发布于 2019-10-16 11:20:15
如果可能的话,这是一个想法,如果可能的话,您可以使用pyspark来选择不同的Geo散列并创建tempory表。然后从这个表中加入,而不是dataframes。
https://stackoverflow.com/questions/58410552
复制相似问题