首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用一列中的唯一值连接两个pyspark数据格式

用一列中的唯一值连接两个pyspark数据格式
EN

Stack Overflow用户
提问于 2019-10-16 09:55:19
回答 2查看 2.4K关注 0票数 0

比方说,我有两个电火花数据格式,usersshops。下面显示了这两个数据的几行示例。

用户数据:

代码语言:javascript
复制
+---------+-------------+---------+
| idvalue | day-of-week | geohash |
+---------+-------------+---------+
| id-1    |           2 | gcutjjn |
| id-1    |           3 | gcutjjn |
| id-1    |           5 | gcutjht |
+---------+-------------+---------+

商店数据

代码语言:javascript
复制
+---------+-----------+---------+
| shop-id | shop-name | geohash |
+---------+-----------+---------+
| sid-1   | kfc       | gcutjjn |
| sid-2   | mcd       | gcutjhq |
| sid-3   | starbucks | gcutjht |
+---------+-----------+---------+

我需要在Geo散列上加入这两种数据格式。当然,我可以做一个简单的equi连接,但是用户的dataframe是巨大的,包含了数十亿行,在idvalue内部和之间,geohashes可能会重复。因此,我想知道是否有一种方法可以对用户中的唯一地理哈希( dataframe )和商店中的geohashes执行连接( dataframe )。如果我们能够做到这一点,那么就很容易复制商店条目,以匹配由此生成的dataframe中的geohashes。

它可能可以通过熊猫udf实现,我将在users.idvalue上执行groupby,只从组中获取第一行(因为组内的所有I都是相同的),并创建一行数据,与udf内的商店进行连接。从逻辑上看,这是应该工作的,但在性能方面不确定,因为udf(s)通常比触发本机转换慢。任何想法都欢迎。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-10-16 14:36:59

你说过你的用户数据是巨大的,并且“地理哈希很可能在idvalue内部和之间重复”。但是,如果在您的商店中可能存在重复的geohashes,则没有提到dataframe。

如果在后者中没有重复的散列,我认为简单的连接将解决您的问题:

代码语言:javascript
复制
val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+---------+-------+
|shop_id|shop_name|geohash|
+-------+---------+-------+
|  sid-1|      kfc|gcutjjn|
|  sid-2|      mcd|gcutjhq|
|  sid-3|starbucks|gcutjht|
+-------+---------+-------+

shopDf
    .join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+------+
|geohash|shop_id|idvalue|  days|
+-------+-------+-------+------+
|gcutjjn|  sid-1|   id-1|[2, 3]|
|gcutjht|  sid-3|   id-1|   [5]|
|gcutjjn|  sid-1|   id-2|   [2]|
+-------+-------+-------+------+

如果您有重复的散列值在您的商店数据same中,一种可能的方法是从您的商店dataframe中删除那些重复的散列(如果您的需求允许的话),然后执行相同的联接操作。

代码语言:javascript
复制
val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht"),("sid-4","burguer king","gcutjjn")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+------------+-------+
|shop_id|   shop_name|geohash|
+-------+------------+-------+
|  sid-1|         kfc|gcutjjn|  <<  Duplicated geohash
|  sid-2|         mcd|gcutjhq|
|  sid-3|   starbucks|gcutjht|
|  sid-4|burguer king|gcutjjn|  <<  Duplicated geohash
+-------+------------+-------+

//Dataframe with hashes to exclude:
val excludedHashes = shopDf.groupBy("geohash").count.filter("count > 1")
excludedHashes.show
+-------+-----+
|geohash|count|
+-------+-----+
|gcutjjn|    2|
+-------+-----+

//Create a dataframe of shops without the ones with duplicated hashes
val cleanShopDf = shopDf.join(excludedHashes,Seq("geohash"),"left_anti")
cleanShopDf.show
+-------+-------+---------+
|geohash|shop_id|shop_name|
+-------+-------+---------+
|gcutjhq|  sid-2|      mcd|
|gcutjht|  sid-3|starbucks|
+-------+-------+---------+

//Perform the same join operation
cleanShopDf.join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+----+
|geohash|shop_id|idvalue|days|
+-------+-------+-------+----+
|gcutjht|  sid-3|   id-1| [5]|
+-------+-------+-------+----+

提供的代码是用Scala编写的,但可以很容易地转换为Python。

希望这能有所帮助!

票数 1
EN

Stack Overflow用户

发布于 2019-10-16 11:20:15

如果可能的话,这是一个想法,如果可能的话,您可以使用pyspark来选择不同的Geo散列并创建tempory表。然后从这个表中加入,而不是dataframes。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58410552

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档