我有下面的Spark DataFrame:
agent_product_sale=data.frame(agent=c('a','b','c','d','e','f','a','b','c','a','b'),
product=c('P1','P2','P3','P4','P1','p1','p2','p2','P2','P3','P3'),
sale_amount=c(1000,2000,3000,4000,1000,1000,2000,2000,2000,3000,3000))
RDD_aps=createDataFrame(sqlContext,agent_product_sale)
agent product sale_amount
1 a P1 1000
2 b P1 1000
3 c P3 3000
4 d P4 4000
5 d P1 1000
6 c P1 1000
7 a P2 2000
8 b P2 2000
9 c P2 2000
10 a P4 4000
11 b P3 3000我需要按代理对Spark DataFrame进行分组,并为每个代理找到sale_amount最高的产品
agent most_expensive
a P4
b P3
c P3
d P4 我使用以下代码,但它将返回每个代理的最大sale_amount
schema <- structType(structField("agent", "string"),
structField("max_sale_amount", "double"))
result <- gapply(
RDD_aps,
c("agent"),
function(key, x) {
y <- data.frame(key,max(x$sale_amount), stringsAsFactors = FALSE)
}, schema)发布于 2016-09-06 15:29:50
ar1 <- arrange(RDD_aps,desc(RDD_aps$sale_amount))
collect(summarize(groupBy(ar1,ar1$agent),most_expensive=first(ar1$product)))发布于 2016-09-06 14:49:04
使用tapply()或aggregate()可以找到组内的最大值
agent_product_sale=data.frame(agent=c('a','b','c','d','e','f','a','b','c','a','b'),
+ product=c('P1','P2','P3','P4','P1','p1','p2','p2','P2','P3','P3'),
+ sale_amount=c(1000,2000,3000,4000,1000,1000,2000,2000,2000,3000,3000))
tapply(agent_product_sale$sale_amount,agent_product_sale$agent, max)
a b c d e f
3000 3000 3000 4000 1000 1000
aggregate(agent_product_sale$sale_amount,by=list(agent_product_sale$agent), max)
Group.1 x
1 a 3000
2 b 3000
3 c 3000
4 d 4000
5 e 1000
6 f 1000aggregate返回一个data.frame并键入一个数组,这取决于您的喜好,以继续处理结果。
https://stackoverflow.com/questions/39341942
复制相似问题