首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark分组和最大值选择

PySpark分组和最大值选择
EN

Stack Overflow用户
提问于 2016-11-30 21:23:39
回答 2查看 20.4K关注 0票数 5

我有一个像这样的PySpark数据帧

代码语言:javascript
复制
 name   city     date
 satya  Mumbai  13/10/2016
 satya  Pune    02/11/2016
 satya  Mumbai  22/11/2016
 satya  Pune    29/11/2016
 satya  Delhi   30/11/2016
 panda  Delhi   29/11/2016
 brata  BBSR    28/11/2016
 brata  Goa     30/10/2016
 brata  Goa     30/10/2016

我需要为每个名称找出最喜欢的城市,逻辑是“如果城市在聚合‘名称’+‘城市’对上有最大数量的城市出现,则将城市作为fav_city”。如果发现多个相同的事件,则考虑具有最新日期的城市。WIll解释:

代码语言:javascript
复制
d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city

#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name  count  city
 brata    2   Goa    #fav found
 panda    1   Delhi  #fav found
 satya    2   Mumbai #can't say
 satya    2   Pune   #can't say

在用户'satya‘的情况下,我需要返回trx_history并获取具有equal_max计数的城市的最新日期:从’孟买‘或’浦那‘,这是最后一次交易(最大日期),认为该城市为fav_city。在这种情况下,'Pune‘as '29/11/2016’是最新/最大日期。

但我无法进一步说明如何做到这一点。

请帮助我的逻辑或如果有更好的解决方案(更快/紧凑的方式),请建议。谢谢。

EN

回答 2

Stack Overflow用户

发布于 2016-11-30 23:22:58

首先将日期转换为DateType

代码语言:javascript
复制
import pyspark.sql.functions as F

df_with_date = df.withColumn(
    "date",
    F.to_date("date", "dd/MM/yyyy")
    # For Spark < 2.2
    # F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)

下一个groupBy用户和城市,但像这样扩展聚合:

代码语言:javascript
复制
df_agg = (df_with_date
    .groupBy("name", "city")
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date")))

定义窗口:

代码语言:javascript
复制
from pyspark.sql.window import Window

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))

添加排名:

代码语言:javascript
复制
df_with_rank = (df_agg
    .withColumn("rank", F.dense_rank().over(w)))

和过滤器:

代码语言:javascript
复制
result = df_with_rank.where(F.col("rank") == 1)

您可以使用如下代码来检测剩余的重复项:

代码语言:javascript
复制
import sys

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
票数 13
EN

Stack Overflow用户

发布于 2021-04-18 07:05:58

代码语言:javascript
复制
d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city

#So get cities having max count
dd = d.groupby('name').count().sort(F.col('count').desc())
display(dd.take(1))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40889564

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档