我正在做一个groupBy来计算值,但是似乎当我使用group by时,我会丢失所有不在聚合键中的字段:
filtered.filterNot('site) {s:String => ...}
.filterNot('date) {s:String => ...}
aggr = filtered.groupBy('id, 'contentHost) { group =>
group.min('timestamp -> 'min)
//how do I keep original fields? (eg: site, date)
}
aggr.store(Tsv(...)) //eg: field "site" won't be here在pig中,它可能是这样的:
aggr = group filtered by concat('id, 'contentHost);
result = foreach aggr {
generate flatten(filtered), //how to do this in scalding?
min(filtered.timestamp) as min;
}发布于 2014-04-14 21:16:12
我用tuple API也遇到了同样的问题,只能通过使用类型化API来解决。
您可以使用Scala元组,也可以在工作之外定义自己的case类。例如:
case class Data(id: String, site: String, date: String, contentHost: String)然后你会像这样处理它:
val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "...")))
filtered
.filterNot ( data => data.site == "fr" )
.filterNot ( data => data.date == "2014-02-01" )
.groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data
.min // or .minBy { ... }
.toTypedPipe
.write(TypedTsv[((String, String), Data)]("/path/"))https://stackoverflow.com/questions/22941265
复制相似问题