首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Hazelcast-jet的最大聚合

使用Hazelcast-jet的最大聚合
EN

Stack Overflow用户
提问于 2019-03-19 20:18:35
回答 1查看 117关注 0票数 0

我想对整个数据集进行简单的最大值运算。我从Kafka示例开始:https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-maintenance/kafka/src/main/java/avro/KafkaAvroSource.java

我刚刚将管道更改为:

代码语言:javascript
复制
p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
    .map(Map.Entry::getValue)
    .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
    .map(user -> (Integer) user.get(2))
    .drainTo(Sinks.list("result"));

和转到:

代码语言:javascript
复制
IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);

以获取该主题中最大年龄的人。然而,它不会返回20,而且似乎在不同的运行中返回不同的值。知道为什么吗?

EN

回答 1

Stack Overflow用户

发布于 2019-03-19 20:50:04

您似乎在使用rollingAggregate,它在每次接收到一些输入时都会生成一个新的输出项,但是您所检查的只是它发出的第一个项。相反,您必须找到它发出的最新项。一种方法是将结果推送到IMap接收器中,每次使用相同的键:

代码语言:javascript
复制
p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
 .withoutTimestamps()
 .map(Map.Entry::getValue)
 .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
 .map(user -> entry("user", (Integer) user.get(2)))
 .drainTo(Sinks.map("result"));

您可以使用以下命令获取最新结果

代码语言:javascript
复制
IMap<String, Integer> result = jet.getMap("result");
System.out.println(result.get("user");
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55240930

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档