我要做的是将流按两个字段("remote-client-ip", "request-params")分组,并计算每个组中的元组数。把它们合并成一张地图。以下是我的拓扑结构:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));但是经过调试,我发现数据流在第一次groupBy()时被阻塞了,这是一个多字段分组.在随后的聚合语句中,没有为Count()执行任何操作。
所以我想我误解了一些关于多字段分组和聚合之间的相互作用的概念。
请让我知道我的猜测是对是错。谢谢!
发布于 2015-04-17 16:15:56
您正在将已经分组的字段与拓扑中的Aggregate()函数进行分组。试试这个:
.aggregate(new Count(), new Fields("user-word-count"))而不是这样:
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))https://stackoverflow.com/questions/29681542
复制相似问题